adriangb commented on code in PR #20117:
URL: https://github.com/apache/datafusion/pull/20117#discussion_r2801263356


##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -95,9 +101,533 @@ impl OptimizerRule for ExtractLeafExpressions {
     fn rewrite(
         &self,
         plan: LogicalPlan,
-        _config: &dyn OptimizerConfig,
+        config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        Ok(Transformed::no(plan))
+        let alias_generator = config.alias_generator();
+        extract_from_plan(plan, alias_generator)
+    }
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
+///
+/// Works for any number of inputs (0, 1, 2, …N). For multi-input nodes
+/// like Join, each extracted sub-expression is routed to the correct input
+/// by checking which input's schema contains all of the expression's column
+/// references.
+fn extract_from_plan(
+    plan: LogicalPlan,
+    alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+    // Only extract from plan types whose output schema is predictable after
+    // expression rewriting.  Nodes like Window derive column names from
+    // their expressions, so rewriting `get_field` inside a window function
+    // changes the output schema and breaks the recovery projection.
+    if !matches!(
+        &plan,
+        LogicalPlan::Aggregate(_)
+            | LogicalPlan::Filter(_)
+            | LogicalPlan::Sort(_)
+            | LogicalPlan::Limit(_)
+            | LogicalPlan::Join(_)
+    ) {
+        return Ok(Transformed::no(plan));
+    }
+
+    let inputs = plan.inputs();
+    if inputs.is_empty() {
+        return Ok(Transformed::no(plan));
+    }
+
+    // Save original output schema before any transformation
+    let original_schema = Arc::clone(plan.schema());
+
+    // Build per-input schemas from borrowed inputs (before plan is consumed
+    // by map_expressions). We only need schemas and column sets for routing;
+    // the actual inputs are cloned later only if extraction succeeds.
+    let input_schemas: Vec<Arc<DFSchema>> =
+        inputs.iter().map(|i| Arc::clone(i.schema())).collect();
+
+    // Build per-input extractors
+    let mut extractors: Vec<LeafExpressionExtractor> = input_schemas
+        .iter()
+        .map(|schema| LeafExpressionExtractor::new(schema.as_ref(), 
alias_generator))
+        .collect();
+
+    // Build per-input column sets for routing expressions to the correct input
+    let input_column_sets: Vec<std::collections::HashSet<Column>> = 
input_schemas
+        .iter()
+        .map(|schema| schema_columns(schema.as_ref()))
+        .collect();
+
+    // Transform expressions via map_expressions with routing
+    let transformed = plan.map_expressions(|expr| {
+        routing_extract(expr, &mut extractors, &input_column_sets)
+    })?;
+
+    // If no expressions were rewritten, nothing was extracted
+    if !transformed.transformed {
+        return Ok(transformed);
+    }
+
+    // Clone inputs now that we know extraction succeeded. Wrap in Arc
+    // upfront since build_extraction_projection expects &Arc<LogicalPlan>.
+    let owned_inputs: Vec<Arc<LogicalPlan>> = transformed
+        .data
+        .inputs()
+        .into_iter()
+        .map(|i| Arc::new(i.clone()))
+        .collect();
+
+    // Build per-input extraction projections (None means no extractions for 
that input)
+    let new_inputs: Vec<LogicalPlan> = owned_inputs
+        .into_iter()
+        .zip(extractors.iter())
+        .map(|(input_arc, extractor)| {
+            match extractor.build_extraction_projection(&input_arc)? {
+                Some(plan) => Ok(plan),
+                // No extractions for this input — recover the LogicalPlan
+                // without cloning (refcount is 1 since build returned None).
+                None => {
+                    Ok(Arc::try_unwrap(input_arc).unwrap_or_else(|arc| 
(*arc).clone()))
+                }
+            }
+        })
+        .collect::<Result<Vec<_>>>()?;
+
+    // Rebuild the plan keeping its rewritten expressions but replacing
+    // inputs with the new extraction projections.
+    let new_plan = transformed
+        .data
+        .with_new_exprs(transformed.data.expressions(), new_inputs)?;
+
+    // Add recovery projection if the output schema changed
+    let recovered = build_recovery_projection(original_schema.as_ref(), 
new_plan)?;
+
+    Ok(Transformed::yes(recovered))
+}
+
+/// Given an expression, returns the index of the input whose columns fully
+/// cover the expression's column references.
+/// Returns `None` if the expression references columns from multiple inputs
+/// or if multiple inputs match (ambiguous, e.g. unqualified columns present
+/// in both sides of a join).
+fn find_owning_input(
+    expr: &Expr,
+    input_column_sets: &[std::collections::HashSet<Column>],
+) -> Option<usize> {
+    let mut found = None;
+    for (idx, cols) in input_column_sets.iter().enumerate() {
+        if has_all_column_refs(expr, cols) {
+            if found.is_some() {
+                // Ambiguous — multiple inputs match
+                return None;
+            }
+            found = Some(idx);
+        }
+    }
+    found
+}
+
+/// Walks an expression tree top-down, extracting `MoveTowardsLeafNodes`
+/// sub-expressions and routing each to the correct per-input extractor.
+fn routing_extract(
+    expr: Expr,
+    extractors: &mut [LeafExpressionExtractor],
+    input_column_sets: &[std::collections::HashSet<Column>],
+) -> Result<Transformed<Expr>> {
+    expr.transform_down(|e| {
+        // Skip expressions already aliased with extracted expression pattern
+        if let Expr::Alias(alias) = &e
+            && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
+        {
+            return Ok(Transformed {
+                data: e,
+                transformed: false,
+                tnr: TreeNodeRecursion::Jump,
+            });
+        }

Review Comment:
   I think it'd be just as likely that users create some metadata marker as 
that they create aliases like `__datafusion_extracted`. There is prior art for 
using "special" aliases like this in the CSE rule. I don't love it but it does 
work. I could add an extra check for metadata but let's leave that for the day 
a user reports an error with the name detection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to