ethan-tyler commented on code in PR #20117:
URL: https://github.com/apache/datafusion/pull/20117#discussion_r2800731660


##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -559,7 +559,19 @@ fn merge_consecutive_projections(proj: Projection) -> 
Result<Transformed<Project
                 metadata,
             }) => rewrite_expr(*expr, &prev_projection).map(|result| {
                 result.update_data(|expr| {
-                    Expr::Alias(Alias::new(expr, relation, 
name).with_metadata(metadata))
+                    // After substitution, the inner expression may now have 
the
+                    // same schema_name as the alias (e.g. when an extraction
+                    // alias like `__extracted_1 AS f(x)` is resolved back to
+                    // `f(x)`). Wrapping in a redundant self-alias causes a
+                    // cosmetic `f(x) AS f(x)` due to Display vs schema_name
+                    // formatting differences. Drop the alias when it matches.
+                    if expr.schema_name().to_string() == name {
+                        expr
+                    } else {
+                        Expr::Alias(
+                            Alias::new(expr, relation, 
name).with_metadata(metadata),
+                        )
+                    }

Review Comment:
   self alias stripping drops `Alias.metadata` and `SavedName::restore` doesn't 
recover it. Given self aliases after rewrite will be more common, I think it's 
best to guard on `metadata.is_none()` before stripping.
   
   Something like:
   ```suggestion
   if metadata.is_none() && expr.schema_name().to_string() == name {
       expr
   } else {
       Expr::Alias(
           Alias::new(expr, relation, name)
               .with_metadata(metadata),
       )
   }
   ```



##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -559,7 +559,19 @@ fn merge_consecutive_projections(proj: Projection) -> 
Result<Transformed<Project
                 metadata,
             }) => rewrite_expr(*expr, &prev_projection).map(|result| {
                 result.update_data(|expr| {
-                    Expr::Alias(Alias::new(expr, relation, 
name).with_metadata(metadata))
+                    // After substitution, the inner expression may now have 
the
+                    // same schema_name as the alias (e.g. when an extraction
+                    // alias like `__extracted_1 AS f(x)` is resolved back to
+                    // `f(x)`). Wrapping in a redundant self-alias causes a
+                    // cosmetic `f(x) AS f(x)` due to Display vs schema_name
+                    // formatting differences. Drop the alias when it matches.
+                    if expr.schema_name().to_string() == name {

Review Comment:
   Is there a defined contract for metadata preservation through alias rewrites?



##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -155,12 +685,494 @@ impl OptimizerRule for PushDownLeafProjections {
     fn rewrite(
         &self,
         plan: LogicalPlan,
-        _config: &dyn OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        Ok(Transformed::no(plan))
+        let alias_generator = config.alias_generator();
+        match try_push_input(&plan, alias_generator)? {
+            Some(new_plan) => Ok(Transformed::yes(new_plan)),
+            None => Ok(Transformed::no(plan)),
+        }
+    }
+}
+
+/// Attempts to push a projection's extractable expressions further down.
+///
+/// Returns `Some(new_subtree)` if the projection was pushed down or merged,
+/// `None` if there is nothing to push or the projection sits above a barrier.
+fn try_push_input(
+    input: &LogicalPlan,
+    alias_generator: &Arc<AliasGenerator>,
+) -> Result<Option<LogicalPlan>> {
+    let LogicalPlan::Projection(proj) = input else {
+        return Ok(None);
+    };
+    split_and_push_projection(proj, alias_generator)
+}
+
+/// Splits a projection into extractable pieces, pushes them towards leaf
+/// nodes, and adds a recovery projection if needed.
+///
+/// Handles both:
+/// - **Pure extraction projections** (all `__datafusion_extracted` aliases + 
columns)
+/// - **Mixed projections** (containing `MoveTowardsLeafNodes` sub-expressions)
+///
+/// Returns `Some(new_subtree)` if extractions were pushed down,
+/// `None` if there is nothing to extract or push.
+///
+/// # Example: Mixed Projection
+///
+/// ```text
+/// Input plan:
+///   Projection: user['name'] IS NOT NULL AS has_name, id
+///     Filter: ...
+///       TableScan
+///
+/// Phase 1 (Split):
+///   extraction_pairs: [(user['name'], "__datafusion_extracted_1")]
+///   recovery_exprs:   [__datafusion_extracted_1 IS NOT NULL AS has_name, id]
+///
+/// Phase 2 (Push):
+///   Push extraction projection through Filter toward TableScan
+///
+/// Phase 3 (Recovery):
+///   Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, id       
<-- recovery
+///     Filter: ...
+///       Projection: user['name'] AS __datafusion_extracted_1, id           
<-- extraction (pushed)
+///         TableScan
+/// ```
+fn split_and_push_projection(
+    proj: &Projection,
+    alias_generator: &Arc<AliasGenerator>,
+) -> Result<Option<LogicalPlan>> {
+    let input = &proj.input;
+    let input_schema = input.schema();
+
+    // ── Phase 1: Split ──────────────────────────────────────────────────
+    // For each projection expression, collect extraction pairs and build
+    // recovery expressions.
+    //
+    // Pre-existing `__datafusion_extracted` aliases are inserted into the
+    // extractor's `IndexMap` with the **full** `Expr::Alias(…)` as the key,
+    // so the alias name participates in equality. This prevents collisions
+    // when CSE rewrites produce the same inner expression under different
+    // alias names (e.g. `__common_expr_4 AS __datafusion_extracted_1` and
+    // `__common_expr_4 AS __datafusion_extracted_3`). New extractions from
+    // `routing_extract` use bare (non-Alias) keys and get normal dedup.
+    //
+    // When building the final `extraction_pairs`, the Alias wrapper is
+    // stripped so consumers see the usual `(inner_expr, alias_name)` tuples.
+
+    let mut extractors = vec![LeafExpressionExtractor::new(
+        input_schema.as_ref(),
+        alias_generator,
+    )];
+    let input_column_sets = vec![schema_columns(input_schema.as_ref())];
+
+    let original_schema = proj.schema.as_ref();
+    let mut recovery_exprs: Vec<Expr> = Vec::with_capacity(proj.expr.len());
+    let mut needs_recovery = false;
+    let mut has_new_extractions = false;
+    let mut proj_exprs_captured: usize = 0;
+    // Track standalone column expressions (Case B) to detect column refs
+    // from extracted aliases (Case A) that aren't also standalone expressions.
+    let mut standalone_columns: IndexSet<Column> = IndexSet::new();
+
+    for (expr, (qualifier, field)) in 
proj.expr.iter().zip(original_schema.iter()) {
+        if let Expr::Alias(alias) = expr
+            && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
+        {

Review Comment:
   same prefix collision here, but different impact- user aliases get treated 
as pre existing extracted expressions and pushed down as if they were optimizer 
generated.



##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -95,9 +101,533 @@ impl OptimizerRule for ExtractLeafExpressions {
     fn rewrite(
         &self,
         plan: LogicalPlan,
-        _config: &dyn OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        Ok(Transformed::no(plan))
+        let alias_generator = config.alias_generator();
+        extract_from_plan(plan, alias_generator)
+    }
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
+///
+/// Works for any number of inputs (0, 1, 2, …N). For multi-input nodes
+/// like Join, each extracted sub-expression is routed to the correct input
+/// by checking which input's schema contains all of the expression's column
+/// references.
+fn extract_from_plan(
+    plan: LogicalPlan,
+    alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+    // Only extract from plan types whose output schema is predictable after
+    // expression rewriting.  Nodes like Window derive column names from
+    // their expressions, so rewriting `get_field` inside a window function
+    // changes the output schema and breaks the recovery projection.
+    if !matches!(
+        &plan,
+        LogicalPlan::Aggregate(_)
+            | LogicalPlan::Filter(_)
+            | LogicalPlan::Sort(_)
+            | LogicalPlan::Limit(_)
+            | LogicalPlan::Join(_)
+    ) {
+        return Ok(Transformed::no(plan));
+    }
+
+    let inputs = plan.inputs();
+    if inputs.is_empty() {
+        return Ok(Transformed::no(plan));
+    }
+
+    // Save original output schema before any transformation
+    let original_schema = Arc::clone(plan.schema());
+
+    // Build per-input schemas from borrowed inputs (before plan is consumed
+    // by map_expressions). We only need schemas and column sets for routing;
+    // the actual inputs are cloned later only if extraction succeeds.
+    let input_schemas: Vec<Arc<DFSchema>> =
+        inputs.iter().map(|i| Arc::clone(i.schema())).collect();
+
+    // Build per-input extractors
+    let mut extractors: Vec<LeafExpressionExtractor> = input_schemas
+        .iter()
+        .map(|schema| LeafExpressionExtractor::new(schema.as_ref(), 
alias_generator))
+        .collect();
+
+    // Build per-input column sets for routing expressions to the correct input
+    let input_column_sets: Vec<std::collections::HashSet<Column>> = 
input_schemas
+        .iter()
+        .map(|schema| schema_columns(schema.as_ref()))
+        .collect();
+
+    // Transform expressions via map_expressions with routing
+    let transformed = plan.map_expressions(|expr| {
+        routing_extract(expr, &mut extractors, &input_column_sets)
+    })?;
+
+    // If no expressions were rewritten, nothing was extracted
+    if !transformed.transformed {
+        return Ok(transformed);
+    }
+
+    // Clone inputs now that we know extraction succeeded. Wrap in Arc
+    // upfront since build_extraction_projection expects &Arc<LogicalPlan>.
+    let owned_inputs: Vec<Arc<LogicalPlan>> = transformed
+        .data
+        .inputs()
+        .into_iter()
+        .map(|i| Arc::new(i.clone()))
+        .collect();
+
+    // Build per-input extraction projections (None means no extractions for 
that input)
+    let new_inputs: Vec<LogicalPlan> = owned_inputs
+        .into_iter()
+        .zip(extractors.iter())
+        .map(|(input_arc, extractor)| {
+            match extractor.build_extraction_projection(&input_arc)? {
+                Some(plan) => Ok(plan),
+                // No extractions for this input — recover the LogicalPlan
+                // without cloning (refcount is 1 since build returned None).
+                None => {
+                    Ok(Arc::try_unwrap(input_arc).unwrap_or_else(|arc| 
(*arc).clone()))
+                }
+            }
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    // Rebuild the plan keeping its rewritten expressions but replacing
+    // inputs with the new extraction projections.
+    let new_plan = transformed
+        .data
+        .with_new_exprs(transformed.data.expressions(), new_inputs)?;
+
+    // Add recovery projection if the output schema changed
+    let recovered = build_recovery_projection(original_schema.as_ref(), 
new_plan)?;
+
+    Ok(Transformed::yes(recovered))
+}
+
+/// Given an expression, returns the index of the input whose columns fully
+/// cover the expression's column references.
+/// Returns `None` if the expression references columns from multiple inputs
+/// or if multiple inputs match (ambiguous, e.g. unqualified columns present
+/// in both sides of a join).
+fn find_owning_input(
+    expr: &Expr,
+    input_column_sets: &[std::collections::HashSet<Column>],
+) -> Option<usize> {
+    let mut found = None;
+    for (idx, cols) in input_column_sets.iter().enumerate() {
+        if has_all_column_refs(expr, cols) {
+            if found.is_some() {
+                // Ambiguous — multiple inputs match
+                return None;
+            }
+            found = Some(idx);
+        }
+    }
+    found
+}
+
+/// Walks an expression tree top-down, extracting `MoveTowardsLeafNodes`
+/// sub-expressions and routing each to the correct per-input extractor.
+fn routing_extract(
+    expr: Expr,
+    extractors: &mut [LeafExpressionExtractor],
+    input_column_sets: &[std::collections::HashSet<Column>],
+) -> Result<Transformed<Expr>> {
+    expr.transform_down(|e| {
+        // Skip expressions already aliased with extracted expression pattern
+        if let Expr::Alias(alias) = &e
+            && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
+        {
+            return Ok(Transformed {
+                data: e,
+                transformed: false,
+                tnr: TreeNodeRecursion::Jump,
+            });
+        }

Review Comment:
   `__datafusion_extracted*` detection is string prefix based. A user alias 
with that prefix will trigger `TreeNodeRecursion::Jump` and skip subtree 
traversal. It's worth documenting the prefix as reserved, or gating on metadata 
instead of the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to