This is an automated email from the ASF dual-hosted git repository.

blaginin pushed a commit to branch annarose/dict-coercion
in repository https://gitbox.apache.org/repos/asf/datafusion-sandbox.git

commit 9962911ee5d5cdde5f76a4489ee7d432365dde0c
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Mon Feb 2 10:58:48 2026 -0600

    feat: add ExpressionPlacement enum for optimizer expression placement 
decisions (#20065)
    
    ## Summary
    
    This PR is part of work towards
    https://github.com/apache/datafusion/issues/19387
    
    Extracts the `ExpressionPlacement` enum from apache/datafusion#20036 to
    provide a mechanism for expressions to indicate where they should be
    placed in the query plan for optimal execution.
    
    I've opted to go the route of having expressions declare their behavior
    via a new API on `enum Expr` and `trait PhysicalExpr`:
    
    ```rust
    enum Expr {
        pub fn placement(&self) -> ExpressionPlacement { ... }
       ...
    }
    ```
    And:
    
    ```rust
    trait PhysicalExpr {
       fn placement(&self) -> ExpressionPlacement { ... }
    }
    ```
    
    Where `ExpressionPlacement`:
    
    ```rust
    enum ExpressionPlacement {
        /// Argument is a literal constant value or an expression that can be
        /// evaluated to a constant at planning time.
        Literal,
        /// Argument is a simple column reference.
        Column,
        /// Argument is a complex expression that can be safely placed at leaf 
nodes.
        /// For example, if `get_field(struct_col, 'field_name')` is 
implemented as a
        /// leaf-pushable expression, then it would return this variant.
        /// Then `other_leaf_function(get_field(...), 42)` could also be 
classified as
        /// leaf-pushable using the knowledge that `get_field(...)` is 
leaf-pushable.
        PlaceAtLeaves,
        /// Argument is a complex expression that should be placed at root 
nodes.
        /// For example, `min(col1 + col2)` is not leaf-pushable because it 
requires per-row computation.
        PlaceAtRoot,
    }
    ```
    
    We arrived at `ExprPlacement` after iterating through a version that
    had:
    
    ```rust
    enum ArgTriviality {
        Literal,
        Column,
        Trivial,
        NonTrivial,
    }
    ```
    
    This terminology came from existing concepts in the codebase that were
    sprinkled around various places in the logical and physical layers. Some
    examples:
    
    
    
https://github.com/apache/datafusion/blob/f819061833d0ee4d7899ed6a0a431c584533b241/datafusion/physical-plan/src/projection.rs#L282-L290
    
    
    
https://github.com/apache/datafusion/blob/f819061833d0ee4d7899ed6a0a431c584533b241/datafusion/physical-plan/src/projection.rs#L1120-L1125
    
    
    
https://github.com/apache/datafusion/blob/f819061833d0ee4d7899ed6a0a431c584533b241/datafusion/optimizer/src/optimize_projections/mod.rs#L589-L592
    
    The new API adds the nuance / distinction of the case of `get_field(col,
    'a')` where it is neither a column nor a literal but it is trivial.
    
    It also gives scalar functions the ability to classify themselves.
    This part was a bit tricky because `ScalarUDFImpl` (the scalar function
    trait that users implement) lives in `datafuions-expr` which cannot have
    references to `datafusion-physical-expr-common` (where `PhysicalExpr` is
    defined).
    But once we are in the physical layer scalar functions are represented
    as `func: ScalarUDFImpl, args: Vec<Arc<dyn PhysicalExpr>>`.
    And since we can't have a trait method referencing `PhysicalExpr` there
    would be no way to ask a function to classify itself in the physical
    layer.
    
    Additionally even if we could refer to `PhysicalExpr` from the
    `ScalarUDFImpl` trait we would then need 2 methods with similar but
    divergent logic (match on the `Expr` enum in one, downcast to various
    known types in the physical version) that adds boilerplate for
    implementers.
    
    The `ExprPlacement` enum solves this problem: we can have a single
    method `ScalarUDFImpl::placement(args: &[ExpressionPlacement])`.
    The parent of `ScalarUDFImpl` will call either `Expr::placement` or
    `PhysicalExpr::placement` depending on which one it has.
    
    ## Changes
    
    - Add `ExpressionPlacement` enum in `datafusion-expr-common` with four
    variants:
      - `Literal` - constant values
      - `Column` - simple column references
    - `PlaceAtLeaves` - cheap expressions (like `get_field`) that can be
    pushed to leaf nodes
      - `PlaceAtRoot` - expensive expressions that should stay at root
    
    - Add `placement()` method to:
      - `Expr` enum
    - `ScalarUDF` / `ScalarUDFImpl` traits (with default returning
    `PlaceAtRoot`)
      - `PhysicalExpr` trait (with default returning `PlaceAtRoot`)
    - Physical expression implementations for `Column`, `Literal`, and
    `ScalarFunctionExpr`
    
    - Implement `placement()` for `GetFieldFunc` that returns
    `PlaceAtLeaves` when accessing struct fields with literal keys
    
    - Replace `is_expr_trivial()` function checks with `placement()` checks
    in:
      - `datafusion/optimizer/src/optimize_projections/mod.rs`
      - `datafusion/physical-plan/src/projection.rs`
    
    ## Test Plan
    
    - [x] `cargo check` passes on all affected packages
    - [x] `cargo test -p datafusion-optimizer` passes
    - [x] `cargo test -p datafusion-physical-plan` passes (except unrelated
    zstd feature test)
    - [x] `cargo test -p datafusion-functions --lib getfield` passes
    
    🤖 Generated with [Claude Code](https://claude.ai/code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 datafusion/expr-common/src/lib.rs                  |   3 +
 datafusion/expr-common/src/placement.rs            |  62 +++++++++++
 datafusion/expr/src/expr.rs                        |  18 ++++
 datafusion/expr/src/lib.rs                         |   1 +
 datafusion/expr/src/udf.rs                         |  26 +++++
 datafusion/functions/src/core/getfield.rs          | 118 ++++++++++++++++++++-
 .../optimizer/src/common_subexpr_eliminate.rs      |   5 +
 .../optimizer/src/optimize_projections/mod.rs      |  14 +--
 .../physical-expr-common/src/physical_expr.rs      |  11 ++
 datafusion/physical-expr/src/expressions/column.rs |   5 +
 .../physical-expr/src/expressions/literal.rs       |   5 +
 datafusion/physical-expr/src/scalar_function.rs    |  10 +-
 datafusion/physical-plan/src/projection.rs         |  27 ++---
 datafusion/sqllogictest/test_files/aggregate.slt   |  15 +--
 .../test_files/projection_pushdown.slt             |  22 +++-
 datafusion/sqllogictest/test_files/unnest.slt      |   4 +-
 16 files changed, 307 insertions(+), 39 deletions(-)

diff --git a/datafusion/expr-common/src/lib.rs 
b/datafusion/expr-common/src/lib.rs
index 0018694d1..c9a95fd29 100644
--- a/datafusion/expr-common/src/lib.rs
+++ b/datafusion/expr-common/src/lib.rs
@@ -40,7 +40,10 @@ pub mod dyn_eq;
 pub mod groups_accumulator;
 pub mod interval_arithmetic;
 pub mod operator;
+pub mod placement;
 pub mod signature;
 pub mod sort_properties;
 pub mod statistics;
 pub mod type_coercion;
+
+pub use placement::ExpressionPlacement;
diff --git a/datafusion/expr-common/src/placement.rs 
b/datafusion/expr-common/src/placement.rs
new file mode 100644
index 000000000..8212ba618
--- /dev/null
+++ b/datafusion/expr-common/src/placement.rs
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Expression placement information for optimization decisions.
+
+/// Describes where an expression should be placed in the query plan for
+/// optimal execution. This is used by optimizers to make decisions about
+/// expression placement, such as whether to push expressions down through
+/// projections.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum ExpressionPlacement {
+    /// A constant literal value.
+    Literal,
+    /// A simple column reference.
+    Column,
+    /// A cheap expression that can be pushed to leaf nodes in the plan.
+    /// Examples include `get_field` for struct field access.
+    /// Pushing these expressions down in the plan can reduce data early
+    /// at low compute cost.
+    /// See [`ExpressionPlacement::should_push_to_leaves`] for details.
+    MoveTowardsLeafNodes,
+    /// An expensive expression that should stay where it is in the plan.
+    /// Examples include complex scalar functions or UDFs.
+    KeepInPlace,
+}
+
+impl ExpressionPlacement {
+    /// Returns true if the expression can be pushed down to leaf nodes
+    /// in the query plan.
+    ///
+    /// This returns true for:
+    /// - [`ExpressionPlacement::Column`]: Simple column references can be 
pushed down. They do no compute and do not increase or
+    ///   decrease the amount of data being processed.
+    ///   A projection that reduces the number of columns can eliminate 
unnecessary data early,
+    ///   but this method only considers one expression at a time, not a 
projection as a whole.
+    /// - [`ExpressionPlacement::MoveTowardsLeafNodes`]: Cheap expressions can 
be pushed down to leaves to take advantage of
+    ///   early computation and potential optimizations at the data source 
level.
+    ///   For example `struct_col['field']` is cheap to compute (just an Arc 
clone of the nested array for `'field'`)
+    ///   and thus can reduce data early in the plan at very low compute cost.
+    ///   It may even be possible to eliminate the expression entirely if the 
data source can project only the needed field
+    ///   (as e.g. Parquet can).
+    pub fn should_push_to_leaves(&self) -> bool {
+        matches!(
+            self,
+            ExpressionPlacement::Column | 
ExpressionPlacement::MoveTowardsLeafNodes
+        )
+    }
+}
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 8eae81bc5..9bf0ac58c 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -38,6 +38,7 @@ use datafusion_common::tree_node::{
 use datafusion_common::{
     Column, DFSchema, HashMap, Result, ScalarValue, Spans, TableReference,
 };
+use datafusion_expr_common::placement::ExpressionPlacement;
 use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 #[cfg(feature = "sql")]
 use sqlparser::ast::{
@@ -1536,6 +1537,23 @@ impl Expr {
         }
     }
 
+    /// Returns placement information for this expression.
+    ///
+    /// This is used by optimizers to make decisions about expression 
placement,
+    /// such as whether to push expressions down through projections.
+    pub fn placement(&self) -> ExpressionPlacement {
+        match self {
+            Expr::Column(_) => ExpressionPlacement::Column,
+            Expr::Literal(_, _) => ExpressionPlacement::Literal,
+            Expr::ScalarFunction(func) => {
+                let arg_placements: Vec<_> =
+                    func.args.iter().map(|arg| arg.placement()).collect();
+                func.func.placement(&arg_placements)
+            }
+            _ => ExpressionPlacement::KeepInPlace,
+        }
+    }
+
     /// Return String representation of the variant represented by `self`
     /// Useful for non-rust based bindings
     pub fn variant_name(&self) -> &str {
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 201f7a025..cb136229b 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -95,6 +95,7 @@ pub use datafusion_expr_common::accumulator::Accumulator;
 pub use datafusion_expr_common::columnar_value::ColumnarValue;
 pub use datafusion_expr_common::groups_accumulator::{EmitTo, 
GroupsAccumulator};
 pub use datafusion_expr_common::operator::Operator;
+pub use datafusion_expr_common::placement::ExpressionPlacement;
 pub use datafusion_expr_common::signature::{
     ArrayFunctionArgument, ArrayFunctionSignature, Coercion, Signature,
     TIMEZONE_WILDCARD, TypeSignature, TypeSignatureClass, Volatility,
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index 2183bdbea..405fb2568 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -31,6 +31,7 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::{ExprSchema, Result, ScalarValue, not_impl_err};
 use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
 use datafusion_expr_common::interval_arithmetic::Interval;
+use datafusion_expr_common::placement::ExpressionPlacement;
 use std::any::Any;
 use std::cmp::Ordering;
 use std::fmt::Debug;
@@ -361,6 +362,13 @@ impl ScalarUDF {
     pub fn as_async(&self) -> Option<&AsyncScalarUDF> {
         self.inner().as_any().downcast_ref::<AsyncScalarUDF>()
     }
+
+    /// Returns placement information for this function.
+    ///
+    /// See [`ScalarUDFImpl::placement`] for more details.
+    pub fn placement(&self, args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+        self.inner.placement(args)
+    }
 }
 
 impl<F> From<F> for ScalarUDF
@@ -964,6 +972,20 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + 
Sync {
     fn documentation(&self) -> Option<&Documentation> {
         None
     }
+
+    /// Returns placement information for this function.
+    ///
+    /// This is used by optimizers to make decisions about expression 
placement,
+    /// such as whether to push expressions down through projections.
+    ///
+    /// The default implementation returns 
[`ExpressionPlacement::KeepInPlace`],
+    /// meaning the expression should be kept where it is in the plan.
+    ///
+    /// Override this method to indicate that the function can be pushed down
+    /// closer to the data source.
+    fn placement(&self, _args: &[ExpressionPlacement]) -> ExpressionPlacement {
+        ExpressionPlacement::KeepInPlace
+    }
 }
 
 /// ScalarUDF that adds an alias to the underlying function. It is better to
@@ -1091,6 +1113,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
     fn documentation(&self) -> Option<&Documentation> {
         self.inner.documentation()
     }
+
+    fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
+        self.inner.placement(args)
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/functions/src/core/getfield.rs 
b/datafusion/functions/src/core/getfield.rs
index 47a903639..8d1ffb7c4 100644
--- a/datafusion/functions/src/core/getfield.rs
+++ b/datafusion/functions/src/core/getfield.rs
@@ -33,8 +33,8 @@ use datafusion_common::{
 use datafusion_expr::expr::ScalarFunction;
 use datafusion_expr::simplify::ExprSimplifyResult;
 use datafusion_expr::{
-    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs, 
ScalarUDF,
-    ScalarUDFImpl, Signature, Volatility,
+    ColumnarValue, Documentation, Expr, ExpressionPlacement, ReturnFieldArgs,
+    ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
 };
 use datafusion_macros::user_doc;
 
@@ -499,6 +499,32 @@ impl ScalarUDFImpl for GetFieldFunc {
     fn documentation(&self) -> Option<&Documentation> {
         self.doc()
     }
+
+    fn placement(&self, args: &[ExpressionPlacement]) -> ExpressionPlacement {
+        // get_field can be pushed to leaves if:
+        // 1. The base (first arg) is a column or already placeable at leaves
+        // 2. All field keys (remaining args) are literals
+        if args.is_empty() {
+            return ExpressionPlacement::KeepInPlace;
+        }
+
+        let base_placement = args[0];
+        let base_is_pushable = matches!(
+            base_placement,
+            ExpressionPlacement::Column | 
ExpressionPlacement::MoveTowardsLeafNodes
+        );
+
+        let all_keys_are_literals = args
+            .iter()
+            .skip(1)
+            .all(|p| matches!(p, ExpressionPlacement::Literal));
+
+        if base_is_pushable && all_keys_are_literals {
+            ExpressionPlacement::MoveTowardsLeafNodes
+        } else {
+            ExpressionPlacement::KeepInPlace
+        }
+    }
 }
 
 #[cfg(test)]
@@ -542,4 +568,92 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_placement_literal_key() {
+        let func = GetFieldFunc::new();
+
+        // get_field(col, 'literal') -> leaf-pushable (static field access)
+        let args = vec![ExpressionPlacement::Column, 
ExpressionPlacement::Literal];
+        assert_eq!(
+            func.placement(&args),
+            ExpressionPlacement::MoveTowardsLeafNodes
+        );
+
+        // get_field(col, 'a', 'b') -> leaf-pushable (nested static field 
access)
+        let args = vec![
+            ExpressionPlacement::Column,
+            ExpressionPlacement::Literal,
+            ExpressionPlacement::Literal,
+        ];
+        assert_eq!(
+            func.placement(&args),
+            ExpressionPlacement::MoveTowardsLeafNodes
+        );
+
+        // get_field(get_field(col, 'a'), 'b') represented as 
MoveTowardsLeafNodes for base
+        let args = vec![
+            ExpressionPlacement::MoveTowardsLeafNodes,
+            ExpressionPlacement::Literal,
+        ];
+        assert_eq!(
+            func.placement(&args),
+            ExpressionPlacement::MoveTowardsLeafNodes
+        );
+    }
+
+    #[test]
+    fn test_placement_column_key() {
+        let func = GetFieldFunc::new();
+
+        // get_field(col, other_col) -> NOT leaf-pushable (dynamic per-row 
lookup)
+        let args = vec![ExpressionPlacement::Column, 
ExpressionPlacement::Column];
+        assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
+
+        // get_field(col, 'a', other_col) -> NOT leaf-pushable (dynamic nested 
lookup)
+        let args = vec![
+            ExpressionPlacement::Column,
+            ExpressionPlacement::Literal,
+            ExpressionPlacement::Column,
+        ];
+        assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
+    }
+
+    #[test]
+    fn test_placement_root() {
+        let func = GetFieldFunc::new();
+
+        // get_field(root_expr, 'literal') -> NOT leaf-pushable
+        let args = vec![
+            ExpressionPlacement::KeepInPlace,
+            ExpressionPlacement::Literal,
+        ];
+        assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
+
+        // get_field(col, root_expr) -> NOT leaf-pushable
+        let args = vec![
+            ExpressionPlacement::Column,
+            ExpressionPlacement::KeepInPlace,
+        ];
+        assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
+    }
+
+    #[test]
+    fn test_placement_edge_cases() {
+        let func = GetFieldFunc::new();
+
+        // Empty args -> NOT leaf-pushable
+        assert_eq!(func.placement(&[]), ExpressionPlacement::KeepInPlace);
+
+        // Just base, no key -> MoveTowardsLeafNodes (not a valid call but 
should handle gracefully)
+        let args = vec![ExpressionPlacement::Column];
+        assert_eq!(
+            func.placement(&args),
+            ExpressionPlacement::MoveTowardsLeafNodes
+        );
+
+        // Literal base with literal key -> NOT leaf-pushable (would be 
constant-folded)
+        let args = vec![ExpressionPlacement::Literal, 
ExpressionPlacement::Literal];
+        assert_eq!(func.placement(&args), ExpressionPlacement::KeepInPlace);
+    }
 }
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index d9273a8f6..5d29892a2 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -702,6 +702,11 @@ impl CSEController for ExprCSEController<'_> {
         #[expect(deprecated)]
         let is_normal_minus_aggregates = matches!(
             node,
+            // TODO: there's an argument for removing `Literal` from here,
+            // maybe using `Expr::placemement().should_push_to_leaves()` 
instead
+            // so that we extract common literals and don't broadcast them to 
num_batch_rows multiple times.
+            // However that currently breaks things like `percentile_cont()` 
which expect literal arguments
+            // (and would instead be getting `col(__common_expr_n)`).
             Expr::Literal(..)
                 | Expr::Column(..)
                 | Expr::ScalarVariable(..)
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
index f97b05ea6..9cccb20bc 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -525,15 +525,14 @@ fn merge_consecutive_projections(proj: Projection) -> 
Result<Transformed<Project
     expr.iter()
         .for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
 
-    // If an expression is non-trivial and appears more than once, do not merge
+    // If an expression is non-trivial (KeepInPlace) and appears more than 
once, do not merge
     // them as consecutive projections will benefit from a compute-once 
approach.
     // For details, see: https://github.com/apache/datafusion/issues/8296
     if column_referral_map.into_iter().any(|(col, usage)| {
         usage > 1
-            && !is_expr_trivial(
-                &prev_projection.expr
-                    [prev_projection.schema.index_of_column(col).unwrap()],
-            )
+            && 
!prev_projection.expr[prev_projection.schema.index_of_column(col).unwrap()]
+                .placement()
+                .should_push_to_leaves()
     }) {
         // no change
         return Projection::try_new_with_schema(expr, input, 
schema).map(Transformed::no);
@@ -586,11 +585,6 @@ fn merge_consecutive_projections(proj: Projection) -> 
Result<Transformed<Project
     }
 }
 
-// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
-fn is_expr_trivial(expr: &Expr) -> bool {
-    matches!(expr, Expr::Column(_) | Expr::Literal(_, _))
-}
-
 /// Rewrites a projection expression using the projection before it (i.e. its 
input)
 /// This is a subroutine to the `merge_consecutive_projections` function.
 ///
diff --git a/datafusion/physical-expr-common/src/physical_expr.rs 
b/datafusion/physical-expr-common/src/physical_expr.rs
index 2358a2194..7107b0a90 100644
--- a/datafusion/physical-expr-common/src/physical_expr.rs
+++ b/datafusion/physical-expr-common/src/physical_expr.rs
@@ -35,6 +35,7 @@ use datafusion_common::{
 };
 use datafusion_expr_common::columnar_value::ColumnarValue;
 use datafusion_expr_common::interval_arithmetic::Interval;
+use datafusion_expr_common::placement::ExpressionPlacement;
 use datafusion_expr_common::sort_properties::ExprProperties;
 use datafusion_expr_common::statistics::Distribution;
 
@@ -430,6 +431,16 @@ pub trait PhysicalExpr: Any + Send + Sync + Display + 
Debug + DynEq + DynHash {
     fn is_volatile_node(&self) -> bool {
         false
     }
+
+    /// Returns placement information for this expression.
+    ///
+    /// This is used by optimizers to make decisions about expression 
placement,
+    /// such as whether to push expressions down through projections.
+    ///
+    /// The default implementation returns 
[`ExpressionPlacement::KeepInPlace`].
+    fn placement(&self) -> ExpressionPlacement {
+        ExpressionPlacement::KeepInPlace
+    }
 }
 
 #[deprecated(
diff --git a/datafusion/physical-expr/src/expressions/column.rs 
b/datafusion/physical-expr/src/expressions/column.rs
index 8c7e8c319..cf844790a 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -30,6 +30,7 @@ use arrow::{
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{Result, internal_err, plan_err};
 use datafusion_expr::ColumnarValue;
+use datafusion_expr_common::placement::ExpressionPlacement;
 
 /// Represents the column at a given index in a RecordBatch
 ///
@@ -146,6 +147,10 @@ impl PhysicalExpr for Column {
     fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(f, "{}", self.name)
     }
+
+    fn placement(&self) -> ExpressionPlacement {
+        ExpressionPlacement::Column
+    }
 }
 
 impl Column {
diff --git a/datafusion/physical-expr/src/expressions/literal.rs 
b/datafusion/physical-expr/src/expressions/literal.rs
index 1f3fefc60..9105297c9 100644
--- a/datafusion/physical-expr/src/expressions/literal.rs
+++ b/datafusion/physical-expr/src/expressions/literal.rs
@@ -33,6 +33,7 @@ use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Expr;
 use datafusion_expr_common::columnar_value::ColumnarValue;
 use datafusion_expr_common::interval_arithmetic::Interval;
+use datafusion_expr_common::placement::ExpressionPlacement;
 use datafusion_expr_common::sort_properties::{ExprProperties, SortProperties};
 
 /// Represents a literal value
@@ -134,6 +135,10 @@ impl PhysicalExpr for Literal {
     fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         std::fmt::Display::fmt(self, f)
     }
+
+    fn placement(&self) -> ExpressionPlacement {
+        ExpressionPlacement::Literal
+    }
 }
 
 /// Create a literal expression
diff --git a/datafusion/physical-expr/src/scalar_function.rs 
b/datafusion/physical-expr/src/scalar_function.rs
index aa090743a..dab4153fa 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -45,8 +45,8 @@ use datafusion_expr::interval_arithmetic::Interval;
 use datafusion_expr::sort_properties::ExprProperties;
 use datafusion_expr::type_coercion::functions::fields_with_udf;
 use datafusion_expr::{
-    ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, Volatility,
-    expr_vec_fmt,
+    ColumnarValue, ExpressionPlacement, ReturnFieldArgs, ScalarFunctionArgs, 
ScalarUDF,
+    Volatility, expr_vec_fmt,
 };
 
 /// Physical expression of a scalar function
@@ -362,6 +362,12 @@ impl PhysicalExpr for ScalarFunctionExpr {
     fn is_volatile_node(&self) -> bool {
         self.fun.signature().volatility == Volatility::Volatile
     }
+
+    fn placement(&self) -> ExpressionPlacement {
+        let arg_placements: Vec<_> =
+            self.args.iter().map(|arg| arg.placement()).collect();
+        self.fun.placement(&arg_placements)
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index 8d4c775f8..f00360292 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -20,7 +20,7 @@
 //! of a projection on table `t1` where the expressions `a`, `b`, and `a+b` 
are the
 //! projection expressions. `SELECT` without `FROM` will only evaluate 
expressions.
 
-use super::expressions::{Column, Literal};
+use super::expressions::Column;
 use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::{
     DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
@@ -48,6 +48,7 @@ use datafusion_common::tree_node::{
 };
 use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
 use datafusion_execution::TaskContext;
+use datafusion_expr::ExpressionPlacement;
 use datafusion_physical_expr::equivalence::ProjectionMapping;
 use datafusion_physical_expr::projection::Projector;
 use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
@@ -285,10 +286,13 @@ impl ExecutionPlan for ProjectionExec {
                 .as_ref()
                 .iter()
                 .all(|proj_expr| {
-                    proj_expr.expr.as_any().is::<Column>()
-                        || proj_expr.expr.as_any().is::<Literal>()
+                    !matches!(
+                        proj_expr.expr.placement(),
+                        ExpressionPlacement::KeepInPlace
+                    )
                 });
-        // If expressions are all either column_expr or Literal, then all 
computations in this projection are reorder or rename,
+        // If expressions are all either column_expr or Literal (or other 
cheap expressions),
+        // then all computations in this projection are reorder or rename,
         // and projection would not benefit from the repartition, 
benefits_from_input_partitioning will return false.
         vec![!all_simple_exprs]
     }
@@ -1003,11 +1007,15 @@ fn try_unifying_projections(
             .unwrap();
     });
     // Merging these projections is not beneficial, e.g
-    // If an expression is not trivial and it is referred more than 1, unifies 
projections will be
+    // If an expression is not trivial (KeepInPlace) and it is referred more 
than 1, unifies projections will be
     // beneficial as caching mechanism for non-trivial computations.
     // See discussion in: https://github.com/apache/datafusion/issues/8296
     if column_ref_map.iter().any(|(column, count)| {
-        *count > 1 && 
!is_expr_trivial(&Arc::clone(&child.expr()[column.index()].expr))
+        *count > 1
+            && !child.expr()[column.index()]
+                .expr
+                .placement()
+                .should_push_to_leaves()
     }) {
         return Ok(None);
     }
@@ -1117,13 +1125,6 @@ fn new_columns_for_join_on(
     (new_columns.len() == hash_join_on.len()).then_some(new_columns)
 }
 
-/// Checks if the given expression is trivial.
-/// An expression is considered trivial if it is either a `Column` or a 
`Literal`.
-fn is_expr_trivial(expr: &Arc<dyn PhysicalExpr>) -> bool {
-    expr.as_any().downcast_ref::<Column>().is_some()
-        || expr.as_any().downcast_ref::<Literal>().is_some()
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index 4c49fae4d..ab217b192 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -7952,8 +7952,9 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----TableScan: t projection=[]
 physical_plan
-01)ProjectionExec: expr=[2 as count(Int64(1)), 2 as count()]
-02)--PlaceholderRowExec
+01)ProjectionExec: expr=[count(Int64(1))@0 as count(Int64(1)), 
count(Int64(1))@0 as count()]
+02)--ProjectionExec: expr=[2 as count(Int64(1))]
+03)----PlaceholderRowExec
 
 query II
 select count(1), count(*) from t;
@@ -7968,8 +7969,9 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----TableScan: t projection=[]
 physical_plan
-01)ProjectionExec: expr=[2 as count(Int64(1)), 2 as count(*)]
-02)--PlaceholderRowExec
+01)ProjectionExec: expr=[count(Int64(1))@0 as count(Int64(1)), 
count(Int64(1))@0 as count(*)]
+02)--ProjectionExec: expr=[2 as count(Int64(1))]
+03)----PlaceholderRowExec
 
 query II
 select count(), count(*) from t;
@@ -7984,8 +7986,9 @@ logical_plan
 02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
 03)----TableScan: t projection=[]
 physical_plan
-01)ProjectionExec: expr=[2 as count(), 2 as count(*)]
-02)--PlaceholderRowExec
+01)ProjectionExec: expr=[count(Int64(1))@0 as count(), count(Int64(1))@0 as 
count(*)]
+02)--ProjectionExec: expr=[2 as count(Int64(1))]
+03)----PlaceholderRowExec
 
 query TT
 explain select count(1) * count(2) from t;
diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt 
b/datafusion/sqllogictest/test_files/projection_pushdown.slt
index 4be835894..3c148561d 100644
--- a/datafusion/sqllogictest/test_files/projection_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt
@@ -932,6 +932,21 @@ SELECT id, id + 100 as computed FROM simple_struct ORDER 
BY id LIMIT 3;
 # plan extracts the shared get_field for efficient computation
 ###
 
+query TT
+EXPLAIN SELECT (id + s['value']) * (id + s['value']) as id_and_value FROM 
simple_struct WHERE id > 2;
+----
+logical_plan
+01)Projection: __common_expr_1 * __common_expr_1 AS id_and_value
+02)--Projection: simple_struct.id + get_field(simple_struct.s, Utf8("value")) 
AS __common_expr_1
+03)----Filter: simple_struct.id > Int64(2)
+04)------TableScan: simple_struct projection=[id, s], 
partial_filters=[simple_struct.id > Int64(2)]
+physical_plan
+01)ProjectionExec: expr=[__common_expr_1@0 * __common_expr_1@0 as id_and_value]
+02)--ProjectionExec: expr=[id@0 + get_field(s@1, value) as __common_expr_1]
+03)----FilterExec: id@0 > 2
+04)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
 projection=[id, s], file_type=parquet, predicate=id@0 > 2, 
pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, 
required_guarantees=[]
+
+
 query TT
 EXPLAIN SELECT s['value'] + s['value'] as doubled FROM simple_struct WHERE id 
> 2;
 ----
@@ -941,10 +956,9 @@ logical_plan
 03)----Filter: simple_struct.id > Int64(2)
 04)------TableScan: simple_struct projection=[id, s], 
partial_filters=[simple_struct.id > Int64(2)]
 physical_plan
-01)ProjectionExec: expr=[__common_expr_1@0 + __common_expr_1@0 as doubled]
-02)--ProjectionExec: expr=[get_field(s@0, value) as __common_expr_1]
-03)----FilterExec: id@0 > 2, projection=[s@1]
-04)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
 projection=[id, s], file_type=parquet, predicate=id@0 > 2, 
pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, 
required_guarantees=[]
+01)ProjectionExec: expr=[get_field(s@0, value) + get_field(s@0, value) as 
doubled]
+02)--FilterExec: id@0 > 2, projection=[s@1]
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
 projection=[id, s], file_type=parquet, predicate=id@0 > 2, 
pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, 
required_guarantees=[]
 
 # Verify correctness
 query I
diff --git a/datafusion/sqllogictest/test_files/unnest.slt 
b/datafusion/sqllogictest/test_files/unnest.slt
index f939cd015..1a6b82020 100644
--- a/datafusion/sqllogictest/test_files/unnest.slt
+++ b/datafusion/sqllogictest/test_files/unnest.slt
@@ -673,8 +673,8 @@ logical_plan
 physical_plan
 01)ProjectionExec: 
expr=[__unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1],depth=2)@0
 as UNNEST(UNNEST(UNNEST(recursive_unnest_table.column3)[c1])), column3@1 as 
column3]
 02)--UnnestExec
-03)----ProjectionExec: 
expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, 
c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), 
column3@1 as column3]
-04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------ProjectionExec: 
expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, 
c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), 
column3@1 as column3]
 05)--------UnnestExec
 06)----------ProjectionExec: expr=[column3@0 as 
__unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3]
 07)------------DataSourceExec: partitions=1, partition_sizes=[1]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to