alamb commented on code in PR #20238:
URL: https://github.com/apache/datafusion/pull/20238#discussion_r2784291420


##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1765 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Two-pass optimizer pipeline that pushes cheap expressions (like struct 
field
+//! access `user['status']`) closer to data sources, enabling early data 
reduction
+//! and source-level optimizations (e.g., Parquet column pruning). See
+//! [`ExtractLeafExpressions`] (pass 1) and [`PushDownLeafProjections`] (pass 
2).
+
+use datafusion_common::Result;
+use datafusion_common::tree_node::Transformed;
+use datafusion_expr::logical_plan::LogicalPlan;
+
+use crate::optimizer::ApplyOrder;
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes
+/// into **extraction projections** (pass 1 of 2).
+///
+/// This handles Filter, Sort, Limit, Aggregate, and Join nodes. For Projection
+/// nodes, extraction and pushdown are handled by [`PushDownLeafProjections`].
+///
+/// # Key Concepts
+///
+/// **Extraction projection**: a projection inserted *below* a node that
+/// pre-computes a cheap expression and exposes it under an alias
+/// (`__datafusion_extracted_N`). The parent node then references the alias
+/// instead of the original expression.
+///
+/// **Recovery projection**: a projection inserted *above* a node to restore
+/// the original output schema when extraction changes it.
+/// Schema-preserving nodes (Filter, Sort, Limit) gain extra columns from
+/// the extraction projection that bubble up; the recovery projection selects
+/// only the original columns to hide the extras.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+///   TableScan: t [id, user]
+/// ```
+///
+/// This rule:
+/// 1. Inserts an **extraction projection** below the filter:
+/// 2. Adds a **recovery projection** above to hide the extra column:
+///
+/// ```text
+/// Projection: id, user                                                       
 <-- recovery projection
+///   Filter: __datafusion_extracted_1 = 'active'
+///     Projection: user['status'] AS __datafusion_extracted_1, id, user       
  <-- extraction projection
+///       TableScan: t [id, user]
+/// ```
+///
+/// **Important:** The `PushDownFilter` rule is aware of projections created 
by this rule
+/// and will not push filters through them. See `is_extracted_expr_projection` 
in utils.rs.
+#[derive(Default, Debug)]
+pub struct ExtractLeafExpressions {}
+
+impl ExtractLeafExpressions {
+    /// Create a new [`ExtractLeafExpressions`]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ExtractLeafExpressions {
+    fn name(&self) -> &str {
+        "extract_leaf_expressions"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        Ok(Transformed::no(plan))
+    }
+}
+
+// 
=============================================================================
+// Pass 2: PushDownLeafProjections
+// 
=============================================================================
+
+/// Pushes extraction projections down through schema-preserving nodes towards
+/// leaf nodes (pass 2 of 2, after [`ExtractLeafExpressions`]).
+///
+/// Handles two types of projections:
+/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + 
columns):
+///   pushes through Filter/Sort/Limit, merges into existing projections, or 
routes
+///   into multi-input node inputs (Join, SubqueryAlias, etc.)
+/// - **Mixed projections** (user projections containing `MoveTowardsLeafNodes`
+///   sub-expressions): splits into a recovery projection + extraction 
projection,
+///   then pushes the extraction projection down.
+///
+/// # Example: Pushing through a Filter
+///
+/// After pass 1, the extraction projection sits directly below the filter:
+/// ```text
+/// Projection: id, user                                                       
<-- recovery
+///   Filter: __extracted_1 = 'active'
+///     Projection: user['status'] AS __extracted_1, id, user                  
 <-- extraction
+///       TableScan: t [id, user]
+/// ```
+///
+/// Pass 2 pushes the extraction projection through the recovery and filter,
+/// and a subsequent `OptimizeProjections` pass removes the (now-redundant)
+/// recovery projection:
+/// ```text
+/// Filter: __extracted_1 = 'active'
+///   Projection: user['status'] AS __extracted_1, id, user                    
 <-- extraction (pushed down)
+///     TableScan: t [id, user]
+/// ```
+#[derive(Default, Debug)]
+pub struct PushDownLeafProjections {}
+
+impl PushDownLeafProjections {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for PushDownLeafProjections {
+    fn name(&self) -> &str {
+        "push_down_leaf_projections"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        Ok(Transformed::no(plan))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::*;
+    use crate::optimize_projections::OptimizeProjections;
+    use crate::test::*;
+    use crate::{OptimizerContext, assert_optimized_plan_eq_snapshot};
+    use arrow::datatypes::DataType;
+    use datafusion_common::Result;
+    use datafusion_expr::expr::ScalarFunction;
+    use datafusion_expr::{
+        ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
+        TypeSignature, col, lit, logical_plan::builder::LogicalPlanBuilder,
+    };
+    use datafusion_expr::{Expr, ExpressionPlacement};
+
+    /// A mock UDF that simulates a leaf-pushable function like `get_field`.
+    /// It returns `MoveTowardsLeafNodes` when its first argument is Column or 
MoveTowardsLeafNodes.
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct MockLeafFunc {
+        signature: Signature,
+    }
+
+    impl MockLeafFunc {
+        fn new() -> Self {
+            Self {
+                signature: Signature::new(
+                    TypeSignature::Any(2),
+                    datafusion_expr::Volatility::Immutable,
+                ),
+            }
+        }
+    }
+
+    impl ScalarUDFImpl for MockLeafFunc {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn name(&self) -> &str {
+            "mock_leaf"
+        }
+
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+
+        fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
+            Ok(DataType::Utf8)
+        }
+
+        fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+            unimplemented!("This is only used for testing optimization")
+        }
+
+        fn placement(&self, args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+            // Return MoveTowardsLeafNodes if first arg is Column or 
MoveTowardsLeafNodes
+            // (like get_field does)
+            match args.first() {
+                Some(ExpressionPlacement::Column)
+                | Some(ExpressionPlacement::MoveTowardsLeafNodes) => {
+                    ExpressionPlacement::MoveTowardsLeafNodes
+                }
+                _ => ExpressionPlacement::KeepInPlace,
+            }
+        }
+    }
+
+    fn mock_leaf(expr: Expr, name: &str) -> Expr {
+        Expr::ScalarFunction(ScalarFunction::new_udf(
+            Arc::new(ScalarUDF::new_from_impl(MockLeafFunc::new())),
+            vec![expr, lit(name)],
+        ))
+    }
+
+    // 
=========================================================================
+    // Test assertion macros - 4 stages of the optimization pipeline
+    // All stages run OptimizeProjections first to match the actual rule 
layout.
+    // 
=========================================================================
+
+    /// Stage 1: Original plan with OptimizeProjections (baseline without 
extraction).
+    /// This shows the plan as it would be without our extraction rules.
+    macro_rules! assert_original_plan {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> =
+                vec![Arc::new(OptimizeProjections::new())];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 2: OptimizeProjections + ExtractLeafExpressions (shows 
extraction projections).
+    macro_rules! assert_after_extract {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 3: OptimizeProjections + Extract + PushDown (extraction pushed 
through schema-preserving nodes).
+    macro_rules! assert_after_pushdown {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+                Arc::new(PushDownLeafProjections::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 4: Full pipeline - OptimizeProjections + Extract + PushDown + 
OptimizeProjections (final).
+    macro_rules! assert_optimized {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+                Arc::new(PushDownLeafProjections::new()),
+                Arc::new(OptimizeProjections::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    #[test]
+    fn test_extract_from_filter() -> Result<()> {
+        let table_scan = test_table_scan_with_struct()?;
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .filter(mock_leaf(col("user"), "status").eq(lit("active")))?
+            .select(vec![
+                table_scan
+                    .schema()
+                    .index_of_column_by_name(None, "id")
+                    .unwrap(),
+            ])?
+            .build()?;
+
+        assert_original_plan!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)?;
+
+        assert_after_extract!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)?;
+
+        // Note: An outer projection is added to preserve the original schema
+        assert_optimized!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)
+    }

Review Comment:
   A minor comment that I think might make this slightly easier to maintain 
would be to create a single macro./ function
   
   Basically, a function that returns a single String that you then `insta` 
review the entire thing
   
   Something like
   
   ```rust
           insta_assert!(do_optimize, @r#"
          ## Original Plan.            < -------- Add heading to separate the 
plans
           Projection: test.id
             Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
               TableScan: test projection=[id, user]
   
          ## Plan after Extraction
           Projection: test.id
             Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
               TableScan: test projection=[id, user]
   
          ## Optimized Plan
           assert_optimized!(plan, @r#"
           Projection: test.id
             Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
               TableScan: test projection=[id, user]
           "#)
      
   ```
   
   You could also do something nice like only print the Plan after extraction 
and optimized plan if they are different (textually) than the original
   
   ```rust
           insta_assert!(do_optimize, @r#"
          ## Original Plan
           Projection: test.id
             Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
               TableScan: test projection=[id, user]
   
          ## Plan after Extraction
            (SAME AS ABOVE)
   
          ## Optimized Plan
            (SAME AS ABOVE)
           "#)
   ```
   
   That would help make it easier to understand what is expected (and would 
probably cut this diff down by a lot)



##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1765 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Two-pass optimizer pipeline that pushes cheap expressions (like struct 
field
+//! access `user['status']`) closer to data sources, enabling early data 
reduction
+//! and source-level optimizations (e.g., Parquet column pruning). See
+//! [`ExtractLeafExpressions`] (pass 1) and [`PushDownLeafProjections`] (pass 
2).
+
+use datafusion_common::Result;
+use datafusion_common::tree_node::Transformed;
+use datafusion_expr::logical_plan::LogicalPlan;
+
+use crate::optimizer::ApplyOrder;
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes
+/// into **extraction projections** (pass 1 of 2).
+///
+/// This handles Filter, Sort, Limit, Aggregate, and Join nodes. For Projection
+/// nodes, extraction and pushdown are handled by [`PushDownLeafProjections`].
+///
+/// # Key Concepts
+///
+/// **Extraction projection**: a projection inserted *below* a node that
+/// pre-computes a cheap expression and exposes it under an alias
+/// (`__datafusion_extracted_N`). The parent node then references the alias
+/// instead of the original expression.
+///
+/// **Recovery projection**: a projection inserted *above* a node to restore
+/// the original output schema when extraction changes it.
+/// Schema-preserving nodes (Filter, Sort, Limit) gain extra columns from
+/// the extraction projection that bubble up; the recovery projection selects
+/// only the original columns to hide the extras.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+///   TableScan: t [id, user]
+/// ```
+///
+/// This rule:
+/// 1. Inserts an **extraction projection** below the filter:
+/// 2. Adds a **recovery projection** above to hide the extra column:
+///
+/// ```text
+/// Projection: id, user                                                       
 <-- recovery projection
+///   Filter: __datafusion_extracted_1 = 'active'
+///     Projection: user['status'] AS __datafusion_extracted_1, id, user       
  <-- extraction projection
+///       TableScan: t [id, user]
+/// ```
+///
+/// **Important:** The `PushDownFilter` rule is aware of projections created 
by this rule
+/// and will not push filters through them. See `is_extracted_expr_projection` 
in utils.rs.
+#[derive(Default, Debug)]
+pub struct ExtractLeafExpressions {}
+
+impl ExtractLeafExpressions {
+    /// Create a new [`ExtractLeafExpressions`]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ExtractLeafExpressions {
+    fn name(&self) -> &str {
+        "extract_leaf_expressions"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        Ok(Transformed::no(plan))
+    }
+}
+
+// 
=============================================================================
+// Pass 2: PushDownLeafProjections
+// 
=============================================================================
+
+/// Pushes extraction projections down through schema-preserving nodes towards
+/// leaf nodes (pass 2 of 2, after [`ExtractLeafExpressions`]).
+///
+/// Handles two types of projections:
+/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + 
columns):
+///   pushes through Filter/Sort/Limit, merges into existing projections, or 
routes
+///   into multi-input node inputs (Join, SubqueryAlias, etc.)
+/// - **Mixed projections** (user projections containing `MoveTowardsLeafNodes`
+///   sub-expressions): splits into a recovery projection + extraction 
projection,
+///   then pushes the extraction projection down.
+///
+/// # Example: Pushing through a Filter
+///
+/// After pass 1, the extraction projection sits directly below the filter:
+/// ```text
+/// Projection: id, user                                                       
<-- recovery
+///   Filter: __extracted_1 = 'active'
+///     Projection: user['status'] AS __extracted_1, id, user                  
 <-- extraction
+///       TableScan: t [id, user]
+/// ```
+///
+/// Pass 2 pushes the extraction projection through the recovery and filter,
+/// and a subsequent `OptimizeProjections` pass removes the (now-redundant)
+/// recovery projection:
+/// ```text
+/// Filter: __extracted_1 = 'active'
+///   Projection: user['status'] AS __extracted_1, id, user                    
 <-- extraction (pushed down)
+///     TableScan: t [id, user]
+/// ```
+#[derive(Default, Debug)]
+pub struct PushDownLeafProjections {}
+
+impl PushDownLeafProjections {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for PushDownLeafProjections {
+    fn name(&self) -> &str {
+        "push_down_leaf_projections"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        Ok(Transformed::no(plan))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::*;
+    use crate::optimize_projections::OptimizeProjections;
+    use crate::test::*;
+    use crate::{OptimizerContext, assert_optimized_plan_eq_snapshot};
+    use arrow::datatypes::DataType;
+    use datafusion_common::Result;
+    use datafusion_expr::expr::ScalarFunction;
+    use datafusion_expr::{
+        ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
+        TypeSignature, col, lit, logical_plan::builder::LogicalPlanBuilder,
+    };
+    use datafusion_expr::{Expr, ExpressionPlacement};
+
+    /// A mock UDF that simulates a leaf-pushable function like `get_field`.
+    /// It returns `MoveTowardsLeafNodes` when its first argument is Column or 
MoveTowardsLeafNodes.
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct MockLeafFunc {
+        signature: Signature,
+    }
+
+    impl MockLeafFunc {
+        fn new() -> Self {
+            Self {
+                signature: Signature::new(
+                    TypeSignature::Any(2),
+                    datafusion_expr::Volatility::Immutable,
+                ),
+            }
+        }
+    }
+
+    impl ScalarUDFImpl for MockLeafFunc {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn name(&self) -> &str {
+            "mock_leaf"
+        }
+
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+
+        fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
+            Ok(DataType::Utf8)
+        }
+
+        fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+            unimplemented!("This is only used for testing optimization")
+        }
+
+        fn placement(&self, args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+            // Return MoveTowardsLeafNodes if first arg is Column or 
MoveTowardsLeafNodes
+            // (like get_field does)
+            match args.first() {
+                Some(ExpressionPlacement::Column)
+                | Some(ExpressionPlacement::MoveTowardsLeafNodes) => {
+                    ExpressionPlacement::MoveTowardsLeafNodes
+                }
+                _ => ExpressionPlacement::KeepInPlace,
+            }
+        }
+    }
+
+    fn mock_leaf(expr: Expr, name: &str) -> Expr {
+        Expr::ScalarFunction(ScalarFunction::new_udf(
+            Arc::new(ScalarUDF::new_from_impl(MockLeafFunc::new())),
+            vec![expr, lit(name)],
+        ))
+    }
+
+    // 
=========================================================================
+    // Test assertion macros - 4 stages of the optimization pipeline
+    // All stages run OptimizeProjections first to match the actual rule 
layout.
+    // 
=========================================================================
+
+    /// Stage 1: Original plan with OptimizeProjections (baseline without 
extraction).
+    /// This shows the plan as it would be without our extraction rules.
+    macro_rules! assert_original_plan {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> =
+                vec![Arc::new(OptimizeProjections::new())];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 2: OptimizeProjections + ExtractLeafExpressions (shows 
extraction projections).
+    macro_rules! assert_after_extract {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 3: OptimizeProjections + Extract + PushDown (extraction pushed 
through schema-preserving nodes).
+    macro_rules! assert_after_pushdown {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+                Arc::new(PushDownLeafProjections::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 4: Full pipeline - OptimizeProjections + Extract + PushDown + 
OptimizeProjections (final).
+    macro_rules! assert_optimized {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+                Arc::new(PushDownLeafProjections::new()),
+                Arc::new(OptimizeProjections::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    #[test]
+    fn test_extract_from_filter() -> Result<()> {
+        let table_scan = test_table_scan_with_struct()?;
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .filter(mock_leaf(col("user"), "status").eq(lit("active")))?
+            .select(vec![
+                table_scan
+                    .schema()
+                    .index_of_column_by_name(None, "id")
+                    .unwrap(),
+            ])?
+            .build()?;
+
+        assert_original_plan!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)?;
+
+        assert_after_extract!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)?;
+
+        // Note: An outer projection is added to preserve the original schema
+        assert_optimized!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)
+    }
+
+    #[test]
+    fn test_no_extraction_for_column() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .filter(col("a").eq(lit(1)))?
+            .build()?;
+
+        assert_original_plan!(plan, @r"
+        Filter: test.a = Int32(1)
+          TableScan: test projection=[a, b, c]
+        ")?;
+
+        assert_after_extract!(plan, @r"
+        Filter: test.a = Int32(1)
+          TableScan: test projection=[a, b, c]
+        ")?;
+
+        // No extraction should happen for simple columns
+        assert_optimized!(plan, @r"
+        Filter: test.a = Int32(1)
+          TableScan: test projection=[a, b, c]
+        ")
+    }
+
+    #[test]
+    fn test_extract_from_projection() -> Result<()> {
+        let table_scan = test_table_scan_with_struct()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![mock_leaf(col("user"), "name")])?
+            .build()?;
+
+        assert_original_plan!(plan, @r#"
+        Projection: mock_leaf(test.user, Utf8("name"))
+          TableScan: test projection=[user]
+        "#)?;
+
+        assert_after_extract!(plan, @r#"
+        Projection: mock_leaf(test.user, Utf8("name"))
+          TableScan: test projection=[user]
+        "#)?;
+
+        // Projection expressions with MoveTowardsLeafNodes are extracted

Review Comment:
   what does this mean "extracted" -- it doesn't seem to have moved anywhere 🤔 



##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1765 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Two-pass optimizer pipeline that pushes cheap expressions (like struct 
field
+//! access `user['status']`) closer to data sources, enabling early data 
reduction
+//! and source-level optimizations (e.g., Parquet column pruning). See
+//! [`ExtractLeafExpressions`] (pass 1) and [`PushDownLeafProjections`] (pass 
2).
+
+use datafusion_common::Result;
+use datafusion_common::tree_node::Transformed;
+use datafusion_expr::logical_plan::LogicalPlan;
+
+use crate::optimizer::ApplyOrder;
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes
+/// into **extraction projections** (pass 1 of 2).
+///
+/// This handles Filter, Sort, Limit, Aggregate, and Join nodes. For Projection
+/// nodes, extraction and pushdown are handled by [`PushDownLeafProjections`].
+///
+/// # Key Concepts
+///
+/// **Extraction projection**: a projection inserted *below* a node that
+/// pre-computes a cheap expression and exposes it under an alias
+/// (`__datafusion_extracted_N`). The parent node then references the alias
+/// instead of the original expression.
+///
+/// **Recovery projection**: a projection inserted *above* a node to restore
+/// the original output schema when extraction changes it.
+/// Schema-preserving nodes (Filter, Sort, Limit) gain extra columns from
+/// the extraction projection that bubble up; the recovery projection selects
+/// only the original columns to hide the extras.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+///   TableScan: t [id, user]
+/// ```
+///
+/// This rule:
+/// 1. Inserts an **extraction projection** below the filter:
+/// 2. Adds a **recovery projection** above to hide the extra column:
+///
+/// ```text
+/// Projection: id, user                                                       
 <-- recovery projection
+///   Filter: __datafusion_extracted_1 = 'active'
+///     Projection: user['status'] AS __datafusion_extracted_1, id, user       
  <-- extraction projection
+///       TableScan: t [id, user]
+/// ```
+///
+/// **Important:** The `PushDownFilter` rule is aware of projections created 
by this rule
+/// and will not push filters through them. See `is_extracted_expr_projection` 
in utils.rs.
+#[derive(Default, Debug)]
+pub struct ExtractLeafExpressions {}
+
+impl ExtractLeafExpressions {
+    /// Create a new [`ExtractLeafExpressions`]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ExtractLeafExpressions {
+    fn name(&self) -> &str {
+        "extract_leaf_expressions"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        Ok(Transformed::no(plan))
+    }
+}
+
+// 
=============================================================================
+// Pass 2: PushDownLeafProjections
+// 
=============================================================================
+
+/// Pushes extraction projections down through schema-preserving nodes towards
+/// leaf nodes (pass 2 of 2, after [`ExtractLeafExpressions`]).
+///
+/// Handles two types of projections:
+/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + 
columns):
+///   pushes through Filter/Sort/Limit, merges into existing projections, or 
routes
+///   into multi-input node inputs (Join, SubqueryAlias, etc.)
+/// - **Mixed projections** (user projections containing `MoveTowardsLeafNodes`
+///   sub-expressions): splits into a recovery projection + extraction 
projection,
+///   then pushes the extraction projection down.
+///
+/// # Example: Pushing through a Filter
+///
+/// After pass 1, the extraction projection sits directly below the filter:
+/// ```text
+/// Projection: id, user                                                       
<-- recovery
+///   Filter: __extracted_1 = 'active'
+///     Projection: user['status'] AS __extracted_1, id, user                  
 <-- extraction
+///       TableScan: t [id, user]
+/// ```
+///
+/// Pass 2 pushes the extraction projection through the recovery and filter,
+/// and a subsequent `OptimizeProjections` pass removes the (now-redundant)
+/// recovery projection:
+/// ```text
+/// Filter: __extracted_1 = 'active'
+///   Projection: user['status'] AS __extracted_1, id, user                    
 <-- extraction (pushed down)
+///     TableScan: t [id, user]
+/// ```
+#[derive(Default, Debug)]
+pub struct PushDownLeafProjections {}
+
+impl PushDownLeafProjections {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for PushDownLeafProjections {
+    fn name(&self) -> &str {
+        "push_down_leaf_projections"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::TopDown)
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        _config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        Ok(Transformed::no(plan))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::*;
+    use crate::optimize_projections::OptimizeProjections;
+    use crate::test::*;
+    use crate::{OptimizerContext, assert_optimized_plan_eq_snapshot};
+    use arrow::datatypes::DataType;
+    use datafusion_common::Result;
+    use datafusion_expr::expr::ScalarFunction;
+    use datafusion_expr::{
+        ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
+        TypeSignature, col, lit, logical_plan::builder::LogicalPlanBuilder,
+    };
+    use datafusion_expr::{Expr, ExpressionPlacement};
+
+    /// A mock UDF that simulates a leaf-pushable function like `get_field`.
+    /// It returns `MoveTowardsLeafNodes` when its first argument is Column or 
MoveTowardsLeafNodes.
+    #[derive(Debug, PartialEq, Eq, Hash)]
+    struct MockLeafFunc {
+        signature: Signature,
+    }
+
+    impl MockLeafFunc {
+        fn new() -> Self {
+            Self {
+                signature: Signature::new(
+                    TypeSignature::Any(2),
+                    datafusion_expr::Volatility::Immutable,
+                ),
+            }
+        }
+    }
+
+    impl ScalarUDFImpl for MockLeafFunc {
+        fn as_any(&self) -> &dyn std::any::Any {
+            self
+        }
+
+        fn name(&self) -> &str {
+            "mock_leaf"
+        }
+
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+
+        fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
+            Ok(DataType::Utf8)
+        }
+
+        fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+            unimplemented!("This is only used for testing optimization")
+        }
+
+        fn placement(&self, args: &[ExpressionPlacement]) -> 
ExpressionPlacement {
+            // Return MoveTowardsLeafNodes if first arg is Column or 
MoveTowardsLeafNodes
+            // (like get_field does)
+            match args.first() {
+                Some(ExpressionPlacement::Column)
+                | Some(ExpressionPlacement::MoveTowardsLeafNodes) => {
+                    ExpressionPlacement::MoveTowardsLeafNodes
+                }
+                _ => ExpressionPlacement::KeepInPlace,
+            }
+        }
+    }
+
+    fn mock_leaf(expr: Expr, name: &str) -> Expr {
+        Expr::ScalarFunction(ScalarFunction::new_udf(
+            Arc::new(ScalarUDF::new_from_impl(MockLeafFunc::new())),
+            vec![expr, lit(name)],
+        ))
+    }
+
+    // 
=========================================================================
+    // Test assertion macros - 4 stages of the optimization pipeline
+    // All stages run OptimizeProjections first to match the actual rule 
layout.
+    // 
=========================================================================
+
+    /// Stage 1: Original plan with OptimizeProjections (baseline without 
extraction).
+    /// This shows the plan as it would be without our extraction rules.
+    macro_rules! assert_original_plan {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> =
+                vec![Arc::new(OptimizeProjections::new())];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 2: OptimizeProjections + ExtractLeafExpressions (shows 
extraction projections).
+    macro_rules! assert_after_extract {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 3: OptimizeProjections + Extract + PushDown (extraction pushed 
through schema-preserving nodes).
+    macro_rules! assert_after_pushdown {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+                Arc::new(PushDownLeafProjections::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    /// Stage 4: Full pipeline - OptimizeProjections + Extract + PushDown + 
OptimizeProjections (final).
+    macro_rules! assert_optimized {
+        ($plan:expr, @ $expected:literal $(,)?) => {{
+            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
+            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
+                Arc::new(OptimizeProjections::new()),
+                Arc::new(ExtractLeafExpressions::new()),
+                Arc::new(PushDownLeafProjections::new()),
+                Arc::new(OptimizeProjections::new()),
+            ];
+            assert_optimized_plan_eq_snapshot!(optimizer_ctx, rules, 
$plan.clone(), @ $expected,)
+        }};
+    }
+
+    #[test]
+    fn test_extract_from_filter() -> Result<()> {
+        let table_scan = test_table_scan_with_struct()?;
+        let plan = LogicalPlanBuilder::from(table_scan.clone())
+            .filter(mock_leaf(col("user"), "status").eq(lit("active")))?
+            .select(vec![
+                table_scan
+                    .schema()
+                    .index_of_column_by_name(None, "id")
+                    .unwrap(),
+            ])?
+            .build()?;
+
+        assert_original_plan!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)?;
+
+        assert_after_extract!(plan, @r#"
+        Projection: test.id
+          Filter: mock_leaf(test.user, Utf8("status")) = Utf8("active")
+            TableScan: test projection=[id, user]
+        "#)?;
+
+        // Note: An outer projection is added to preserve the original schema

Review Comment:
   the outer projection is in the initial plan too, right? or is that what you 
are trying to say with this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to