adriangb commented on code in PR #20117: URL: https://github.com/apache/datafusion/pull/20117#discussion_r2756607559
########## datafusion/optimizer/src/extract_leaf_expressions.rs: ########## @@ -0,0 +1,1870 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions into projections. +//! +//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes` computations +//! (like field accessors) live in Projection nodes immediately above scan nodes, making them +//! eligible for pushdown by the `OptimizeProjections` rule. +//! +//! ## Algorithm +//! +//! This rule uses **BottomUp** traversal to push ALL `MoveTowardsLeafNodes` expressions +//! (like `get_field`) to projections immediately above scan nodes. This enables optimal +//! Parquet column pruning. +//! +//! ### Node Classification +//! +//! **Barrier Nodes** (stop pushing, create projection above): +//! - `TableScan` - the leaf, ideal extraction point +//! - `Join` - requires routing to left/right sides +//! - `Aggregate` - changes schema semantics +//! - `SubqueryAlias` - scope boundary +//! - `Union`, `Intersect`, `Except` - schema boundaries +//! +//! **Schema-Preserving Nodes** (push through): +//! - `Filter` - passes all input columns through +//! - `Sort` - passes all input columns through +//! - `Limit` - passes all input columns through +//! - Passthrough `Projection` - only column references +//! +//! ### How It Works +//! +//! 1. Process leaf nodes first (TableScan, etc.) +//! 2. When processing higher nodes, descendants are already finalized +//! 3. Push extractions DOWN through the plan, merging into existing extracted +//! expression projections when possible + +use indexmap::{IndexMap, IndexSet}; +use std::sync::Arc; + +use datafusion_common::alias::AliasGenerator; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Column, DFSchema, Result}; +use datafusion_expr::expr_rewriter::NamePreserver; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection, Sort}; + +use crate::optimizer::ApplyOrder; +use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs, is_extracted_expr_projection}; +use crate::{OptimizerConfig, OptimizerRule}; + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into projections. +/// +/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations (like field +/// accessors) live in Projection nodes, making them eligible for pushdown. +/// +/// # Example +/// +/// Given a filter with a struct field access: +/// +/// ```text +/// Filter: user['status'] = 'active' +/// TableScan: t [user] +/// ``` +/// +/// This rule extracts the field access into a projection: +/// +/// ```text +/// Filter: __datafusion_extracted_1 = 'active' +/// Projection: user['status'] AS __datafusion_extracted_1, user +/// TableScan: t [user] +/// ``` +/// +/// The `OptimizeProjections` rule can then push this projection down to the scan. +/// +/// **Important:** The `PushDownFilter` rule is aware of projections created by this rule +/// and will not push filters through them. See `is_extracted_expr_projection` in utils.rs. +#[derive(Default, Debug)] +pub struct ExtractLeafExpressions {} + +impl ExtractLeafExpressions { + /// Create a new [`ExtractLeafExpressions`] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for ExtractLeafExpressions { + fn name(&self) -> &str { + "extract_leaf_expressions" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>> { + let alias_generator = config.alias_generator(); + extract_from_plan(plan, alias_generator) + } +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node. +/// +/// With BottomUp traversal, we process leaves first, then work up. +/// This allows us to push extractions all the way down to scan nodes. +fn extract_from_plan( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + match &plan { + // Schema-preserving nodes - extract and push down + LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_) => { + extract_from_schema_preserving(plan, alias_generator) + } + + // Schema-transforming nodes need special handling + LogicalPlan::Aggregate(_) => extract_from_aggregate(plan, alias_generator), + LogicalPlan::Projection(_) => extract_from_projection(plan, alias_generator), + LogicalPlan::Join(_) => extract_from_join(plan, alias_generator), + + // Everything else passes through unchanged + _ => Ok(Transformed::no(plan)), Review Comment: I'm not sure what else we could handle here. Maybe Extension? Possibly before we merge this PR we expand this to explicitly ignore all other nodes so that if a new node is added one has to decide how this rule should handle it. ########## datafusion/optimizer/src/extract_leaf_expressions.rs: ########## @@ -0,0 +1,1870 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions into projections. +//! +//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes` computations +//! (like field accessors) live in Projection nodes immediately above scan nodes, making them +//! eligible for pushdown by the `OptimizeProjections` rule. +//! +//! ## Algorithm +//! +//! This rule uses **BottomUp** traversal to push ALL `MoveTowardsLeafNodes` expressions +//! (like `get_field`) to projections immediately above scan nodes. This enables optimal +//! Parquet column pruning. +//! +//! ### Node Classification +//! +//! **Barrier Nodes** (stop pushing, create projection above): +//! - `TableScan` - the leaf, ideal extraction point +//! - `Join` - requires routing to left/right sides +//! - `Aggregate` - changes schema semantics +//! - `SubqueryAlias` - scope boundary +//! - `Union`, `Intersect`, `Except` - schema boundaries +//! +//! **Schema-Preserving Nodes** (push through): +//! - `Filter` - passes all input columns through +//! - `Sort` - passes all input columns through +//! - `Limit` - passes all input columns through +//! - Passthrough `Projection` - only column references +//! +//! ### How It Works +//! +//! 1. Process leaf nodes first (TableScan, etc.) +//! 2. When processing higher nodes, descendants are already finalized +//! 3. Push extractions DOWN through the plan, merging into existing extracted +//! expression projections when possible + +use indexmap::{IndexMap, IndexSet}; +use std::sync::Arc; + +use datafusion_common::alias::AliasGenerator; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Column, DFSchema, Result}; +use datafusion_expr::expr_rewriter::NamePreserver; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection, Sort}; + +use crate::optimizer::ApplyOrder; +use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs, is_extracted_expr_projection}; +use crate::{OptimizerConfig, OptimizerRule}; + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into projections. +/// +/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations (like field +/// accessors) live in Projection nodes, making them eligible for pushdown. +/// +/// # Example +/// +/// Given a filter with a struct field access: +/// +/// ```text +/// Filter: user['status'] = 'active' +/// TableScan: t [user] +/// ``` +/// +/// This rule extracts the field access into a projection: +/// +/// ```text +/// Filter: __datafusion_extracted_1 = 'active' +/// Projection: user['status'] AS __datafusion_extracted_1, user +/// TableScan: t [user] +/// ``` +/// +/// The `OptimizeProjections` rule can then push this projection down to the scan. +/// +/// **Important:** The `PushDownFilter` rule is aware of projections created by this rule +/// and will not push filters through them. See `is_extracted_expr_projection` in utils.rs. +#[derive(Default, Debug)] +pub struct ExtractLeafExpressions {} + +impl ExtractLeafExpressions { + /// Create a new [`ExtractLeafExpressions`] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for ExtractLeafExpressions { + fn name(&self) -> &str { + "extract_leaf_expressions" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>> { + let alias_generator = config.alias_generator(); + extract_from_plan(plan, alias_generator) + } +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node. +/// +/// With BottomUp traversal, we process leaves first, then work up. +/// This allows us to push extractions all the way down to scan nodes. +fn extract_from_plan( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + match &plan { + // Schema-preserving nodes - extract and push down + LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_) => { + extract_from_schema_preserving(plan, alias_generator) + } + + // Schema-transforming nodes need special handling + LogicalPlan::Aggregate(_) => extract_from_aggregate(plan, alias_generator), + LogicalPlan::Projection(_) => extract_from_projection(plan, alias_generator), + LogicalPlan::Join(_) => extract_from_join(plan, alias_generator), + + // Everything else passes through unchanged + _ => Ok(Transformed::no(plan)), + } +} + +/// Extracts from schema-preserving nodes (Filter, Sort, Limit). +/// +/// These nodes don't change the schema, so we can extract expressions +/// and push them down to existing extracted projections or create new ones. +/// +/// Uses CSE's two-level pattern: +/// 1. Inner extraction projection with ALL columns passed through +/// 2. Outer recovery projection to restore original schema +fn extract_from_schema_preserving( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + // Skip nodes with no children + if plan.inputs().is_empty() { + return Ok(Transformed::no(plan)); + } + + let input = plan.inputs()[0].clone(); + let input_schema = Arc::clone(input.schema()); + + // Find where to place extractions (look down through schema-preserving nodes) + let input_arc = Arc::new(input); + let (target, path) = find_extraction_target(&input_arc); + let target_schema = Arc::clone(target.schema()); + + // Extract using target schema - this is where the projection will be placed + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Transform expressions + let transformed = plan.map_expressions(|expr| extractor.extract(expr))?; + + if !extractor.has_extractions() { + return Ok(transformed); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild the path from target back up to our node's input + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Create the node with new input + let new_inputs: Vec<LogicalPlan> = std::iter::once(rebuilt_input) + .chain( + transformed + .data + .inputs() + .iter() + .skip(1) + .map(|p| (*p).clone()), + ) + .collect(); + + let new_plan = transformed + .data + .with_new_exprs(transformed.data.expressions(), new_inputs)?; + + // Use CSE's pattern: add recovery projection to restore original schema + let recovered = build_recover_project_plan(input_schema.as_ref(), new_plan)?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Join nodes. +/// +/// For Joins, we extract from: +/// - `on` expressions: pairs of (left_key, right_key) for equijoin +/// - `filter` expression: non-equi join conditions +/// +/// Each expression is routed to the appropriate side (left or right) based on +/// which columns it references. Expressions referencing columns from both sides +/// cannot have sub-expressions extracted (they must remain in the filter). +fn extract_from_join( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Join(join) = plan else { + return Ok(Transformed::no(plan)); + }; + + let left_schema = join.left.schema(); + let right_schema = join.right.schema(); + + // Create extractors for left and right sides + // Find extraction targets for each side (look through schema-preserving nodes) + let (left_target, left_path) = find_extraction_target(&join.left); + let (right_target, right_path) = find_extraction_target(&join.right); + + let left_target_schema = Arc::clone(left_target.schema()); + let right_target_schema = Arc::clone(right_target.schema()); + + let mut left_extractor = + LeafExpressionExtractor::new(left_target_schema.as_ref(), alias_generator); + let mut right_extractor = + LeafExpressionExtractor::new(right_target_schema.as_ref(), alias_generator); + + // Build column checker to route expressions to correct side + let mut column_checker = ColumnChecker::new(left_schema.as_ref(), right_schema.as_ref()); + + // Extract from `on` expressions (equijoin keys) + let mut new_on = Vec::with_capacity(join.on.len()); + let mut any_extracted = false; + + for (left_key, right_key) in &join.on { + // Left key should reference only left columns + let new_left = left_extractor.extract(left_key.clone())?; + if new_left.transformed { + any_extracted = true; + } + + // Right key should reference only right columns + let new_right = right_extractor.extract(right_key.clone())?; + if new_right.transformed { + any_extracted = true; + } + + new_on.push((new_left.data, new_right.data)); + } + + // Extract from `filter` expression + let new_filter = if let Some(ref filter) = join.filter { + let extracted = extract_from_join_filter( + filter.clone(), + &mut column_checker, + &mut left_extractor, + &mut right_extractor, + )?; + if extracted.transformed { + any_extracted = true; + } + Some(extracted.data) + } else { + None + }; + + if !any_extracted { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + // Save original schema before modifying inputs + let original_schema = Arc::clone(&join.schema); + + // Build left extraction projection if needed + let new_left = if left_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&left_target) { + merge_into_extracted_projection(existing_proj, &left_extractor)? + } else { + left_extractor.build_projection_with_all_columns(left_target)? + }; + Arc::new(rebuild_path(left_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.left) + }; + + // Build right extraction projection if needed + let new_right = if right_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&right_target) { + merge_into_extracted_projection(existing_proj, &right_extractor)? + } else { + right_extractor.build_projection_with_all_columns(right_target)? + }; + Arc::new(rebuild_path(right_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.right) + }; + + // Create new Join with updated inputs and expressions + let new_join = datafusion_expr::logical_plan::Join::try_new( + new_left, + new_right, + new_on, + new_filter, + join.join_type, + join.join_constraint, + join.null_equality, + join.null_aware, + )?; + + // Add recovery projection to restore original schema + // This hides the intermediate extracted expression columns + let recovered = build_recover_project_plan(original_schema.as_ref(), LogicalPlan::Join(new_join))?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a join filter expression. +/// +/// For each sub-expression, determines if it references only left, only right, +/// or both columns, and routes extractions accordingly. +fn extract_from_join_filter( + filter: Expr, + column_checker: &mut ColumnChecker, + left_extractor: &mut LeafExpressionExtractor, + right_extractor: &mut LeafExpressionExtractor, +) -> Result<Transformed<Expr>> { + filter.transform_down(|expr| { + // Skip expressions already aliased with extracted expression pattern + if let Expr::Alias(alias) = &expr + && alias.name.starts_with(EXTRACTED_EXPR_PREFIX) + { + return Ok(Transformed { + data: expr, + transformed: false, + tnr: TreeNodeRecursion::Jump, + }); + } + + match expr.placement() { + ExpressionPlacement::MoveTowardsLeafNodes => { + // Check which side this expression belongs to + if column_checker.is_left_only(&expr) { + // Extract to left side + let col_ref = left_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else if column_checker.is_right_only(&expr) { + // Extract to right side + let col_ref = right_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else { + // References both sides - cannot extract, keep in place + // This shouldn't typically happen for MoveTowardsLeafNodes expressions + // but we handle it gracefully + Ok(Transformed::no(expr)) + } + } + ExpressionPlacement::Column => { + // Track columns for pass-through on appropriate side + if let Expr::Column(col) = &expr { + if column_checker.is_left_only(&expr) { + left_extractor.columns_needed.insert(col.clone()); + } else if column_checker.is_right_only(&expr) { + right_extractor.columns_needed.insert(col.clone()); + } + } + Ok(Transformed::no(expr)) + } + _ => { + // Continue recursing into children + Ok(Transformed::no(expr)) + } + } + }) +} + +/// Evaluates the columns referenced in the given expression to see if they refer +/// only to the left or right columns of a join. +struct ColumnChecker<'a> { + left_schema: &'a DFSchema, + left_columns: Option<std::collections::HashSet<Column>>, + right_schema: &'a DFSchema, + right_columns: Option<std::collections::HashSet<Column>>, +} + +impl<'a> ColumnChecker<'a> { + fn new(left_schema: &'a DFSchema, right_schema: &'a DFSchema) -> Self { + Self { + left_schema, + left_columns: None, + right_schema, + right_columns: None, + } + } + + /// Return true if the expression references only columns from the left side + fn is_left_only(&mut self, predicate: &Expr) -> bool { + if self.left_columns.is_none() { + self.left_columns = Some(schema_columns(self.left_schema)); + } + has_all_column_refs(predicate, self.left_columns.as_ref().unwrap()) + } + + /// Return true if the expression references only columns from the right side + fn is_right_only(&mut self, predicate: &Expr) -> bool { + if self.right_columns.is_none() { + self.right_columns = Some(schema_columns(self.right_schema)); + } + has_all_column_refs(predicate, self.right_columns.as_ref().unwrap()) + } +} + +/// Returns all columns in the schema (both qualified and unqualified forms) +fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> { + schema + .iter() + .flat_map(|(qualifier, field)| { + [ + Column::new(qualifier.cloned(), field.name()), + Column::new_unqualified(field.name()), + ] + }) + .collect() +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Aggregate nodes. +/// +/// For Aggregates, we extract from: +/// - Group-by expressions (full expressions or sub-expressions) +/// - Arguments inside aggregate functions (NOT the aggregate function itself) +/// +/// Uses CSE's two-level pattern with NamePreserver for stable name handling. +fn extract_from_aggregate( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Aggregate(agg) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Save original expression names using NamePreserver (like CSE) + let name_preserver = NamePreserver::new_for_projection(); + let saved_group_names: Vec<_> = agg + .group_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + let saved_aggr_names: Vec<_> = agg + .aggr_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + + // Find where to place extractions + let (target, path) = find_extraction_target(&agg.input); + let target_schema = Arc::clone(target.schema()); + + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Extract from group-by expressions + let mut new_group_by = Vec::with_capacity(agg.group_expr.len()); + let mut has_extractions = false; + + for expr in &agg.group_expr { + let transformed = extractor.extract(expr.clone())?; + if transformed.transformed { + has_extractions = true; + } + new_group_by.push(transformed.data); + } + + // Extract from aggregate function arguments (not the function itself) + let mut new_aggr = Vec::with_capacity(agg.aggr_expr.len()); + + for expr in &agg.aggr_expr { + let transformed = extract_from_aggregate_args(expr.clone(), &mut extractor)?; + if transformed.transformed { + has_extractions = true; + } + new_aggr.push(transformed.data); + } + + if !has_extractions { + return Ok(Transformed::no(LogicalPlan::Aggregate(agg))); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild path from target back up + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Restore names in group-by expressions using NamePreserver + let restored_group_expr: Vec<Expr> = new_group_by + .into_iter() + .zip(saved_group_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Restore names in aggregate expressions using NamePreserver + let restored_aggr_expr: Vec<Expr> = new_aggr + .into_iter() + .zip(saved_aggr_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Create new Aggregate with restored names + // (no outer projection needed if names are properly preserved) + let new_agg = datafusion_expr::logical_plan::Aggregate::try_new( + Arc::new(rebuilt_input), + restored_group_expr, + restored_aggr_expr, + )?; + + Ok(Transformed::yes(LogicalPlan::Aggregate(new_agg))) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Projection nodes. +/// +/// Uses CSE's two-level pattern (outer + inner projections only): +/// - Inner projection: extraction with ALL columns passed through +/// - Outer projection: rewritten expressions with restored names +/// +/// This avoids the unstable 3-level structure that gets broken by OptimizeProjections. +fn extract_from_projection( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Projection(proj) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Skip if this projection is fully extracted (only column references) + if is_fully_extracted(&proj) { + return Ok(Transformed::no(LogicalPlan::Projection(proj))); + } Review Comment: I think this should skip extraction on any expressions that has all `ExprPlacement::should_push_to_leaves` (i.e. also include columns). E.g. if we have `[id, get_field(s, 'foo')]` we can push down the entire projection without extraction. ########## datafusion/optimizer/src/extract_leaf_expressions.rs: ########## @@ -0,0 +1,1870 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions into projections. +//! +//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes` computations +//! (like field accessors) live in Projection nodes immediately above scan nodes, making them +//! eligible for pushdown by the `OptimizeProjections` rule. +//! +//! ## Algorithm +//! +//! This rule uses **BottomUp** traversal to push ALL `MoveTowardsLeafNodes` expressions +//! (like `get_field`) to projections immediately above scan nodes. This enables optimal +//! Parquet column pruning. +//! +//! ### Node Classification +//! +//! **Barrier Nodes** (stop pushing, create projection above): +//! - `TableScan` - the leaf, ideal extraction point +//! - `Join` - requires routing to left/right sides +//! - `Aggregate` - changes schema semantics +//! - `SubqueryAlias` - scope boundary +//! - `Union`, `Intersect`, `Except` - schema boundaries +//! +//! **Schema-Preserving Nodes** (push through): +//! - `Filter` - passes all input columns through +//! - `Sort` - passes all input columns through +//! - `Limit` - passes all input columns through +//! - Passthrough `Projection` - only column references +//! +//! ### How It Works +//! +//! 1. Process leaf nodes first (TableScan, etc.) +//! 2. When processing higher nodes, descendants are already finalized +//! 3. Push extractions DOWN through the plan, merging into existing extracted +//! expression projections when possible + +use indexmap::{IndexMap, IndexSet}; +use std::sync::Arc; + +use datafusion_common::alias::AliasGenerator; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Column, DFSchema, Result}; +use datafusion_expr::expr_rewriter::NamePreserver; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection, Sort}; + +use crate::optimizer::ApplyOrder; +use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs, is_extracted_expr_projection}; +use crate::{OptimizerConfig, OptimizerRule}; + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into projections. +/// +/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations (like field +/// accessors) live in Projection nodes, making them eligible for pushdown. +/// +/// # Example +/// +/// Given a filter with a struct field access: +/// +/// ```text +/// Filter: user['status'] = 'active' +/// TableScan: t [user] +/// ``` +/// +/// This rule extracts the field access into a projection: +/// +/// ```text +/// Filter: __datafusion_extracted_1 = 'active' +/// Projection: user['status'] AS __datafusion_extracted_1, user +/// TableScan: t [user] +/// ``` +/// +/// The `OptimizeProjections` rule can then push this projection down to the scan. +/// +/// **Important:** The `PushDownFilter` rule is aware of projections created by this rule +/// and will not push filters through them. See `is_extracted_expr_projection` in utils.rs. +#[derive(Default, Debug)] +pub struct ExtractLeafExpressions {} + +impl ExtractLeafExpressions { + /// Create a new [`ExtractLeafExpressions`] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for ExtractLeafExpressions { + fn name(&self) -> &str { + "extract_leaf_expressions" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>> { + let alias_generator = config.alias_generator(); + extract_from_plan(plan, alias_generator) + } +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node. +/// +/// With BottomUp traversal, we process leaves first, then work up. +/// This allows us to push extractions all the way down to scan nodes. +fn extract_from_plan( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + match &plan { + // Schema-preserving nodes - extract and push down + LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_) => { + extract_from_schema_preserving(plan, alias_generator) + } + + // Schema-transforming nodes need special handling + LogicalPlan::Aggregate(_) => extract_from_aggregate(plan, alias_generator), + LogicalPlan::Projection(_) => extract_from_projection(plan, alias_generator), + LogicalPlan::Join(_) => extract_from_join(plan, alias_generator), + + // Everything else passes through unchanged + _ => Ok(Transformed::no(plan)), + } +} + +/// Extracts from schema-preserving nodes (Filter, Sort, Limit). +/// +/// These nodes don't change the schema, so we can extract expressions +/// and push them down to existing extracted projections or create new ones. +/// +/// Uses CSE's two-level pattern: +/// 1. Inner extraction projection with ALL columns passed through +/// 2. Outer recovery projection to restore original schema +fn extract_from_schema_preserving( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + // Skip nodes with no children + if plan.inputs().is_empty() { + return Ok(Transformed::no(plan)); + } + + let input = plan.inputs()[0].clone(); + let input_schema = Arc::clone(input.schema()); + + // Find where to place extractions (look down through schema-preserving nodes) + let input_arc = Arc::new(input); + let (target, path) = find_extraction_target(&input_arc); + let target_schema = Arc::clone(target.schema()); + + // Extract using target schema - this is where the projection will be placed + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Transform expressions + let transformed = plan.map_expressions(|expr| extractor.extract(expr))?; + + if !extractor.has_extractions() { + return Ok(transformed); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild the path from target back up to our node's input + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Create the node with new input + let new_inputs: Vec<LogicalPlan> = std::iter::once(rebuilt_input) + .chain( + transformed + .data + .inputs() + .iter() + .skip(1) + .map(|p| (*p).clone()), + ) + .collect(); + + let new_plan = transformed + .data + .with_new_exprs(transformed.data.expressions(), new_inputs)?; + + // Use CSE's pattern: add recovery projection to restore original schema + let recovered = build_recover_project_plan(input_schema.as_ref(), new_plan)?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Join nodes. +/// +/// For Joins, we extract from: +/// - `on` expressions: pairs of (left_key, right_key) for equijoin +/// - `filter` expression: non-equi join conditions +/// +/// Each expression is routed to the appropriate side (left or right) based on +/// which columns it references. Expressions referencing columns from both sides +/// cannot have sub-expressions extracted (they must remain in the filter). +fn extract_from_join( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Join(join) = plan else { + return Ok(Transformed::no(plan)); + }; + + let left_schema = join.left.schema(); + let right_schema = join.right.schema(); + + // Create extractors for left and right sides + // Find extraction targets for each side (look through schema-preserving nodes) + let (left_target, left_path) = find_extraction_target(&join.left); + let (right_target, right_path) = find_extraction_target(&join.right); + + let left_target_schema = Arc::clone(left_target.schema()); + let right_target_schema = Arc::clone(right_target.schema()); + + let mut left_extractor = + LeafExpressionExtractor::new(left_target_schema.as_ref(), alias_generator); + let mut right_extractor = + LeafExpressionExtractor::new(right_target_schema.as_ref(), alias_generator); + + // Build column checker to route expressions to correct side + let mut column_checker = ColumnChecker::new(left_schema.as_ref(), right_schema.as_ref()); + + // Extract from `on` expressions (equijoin keys) + let mut new_on = Vec::with_capacity(join.on.len()); + let mut any_extracted = false; + + for (left_key, right_key) in &join.on { + // Left key should reference only left columns + let new_left = left_extractor.extract(left_key.clone())?; + if new_left.transformed { + any_extracted = true; + } + + // Right key should reference only right columns + let new_right = right_extractor.extract(right_key.clone())?; + if new_right.transformed { + any_extracted = true; + } + + new_on.push((new_left.data, new_right.data)); + } + + // Extract from `filter` expression + let new_filter = if let Some(ref filter) = join.filter { + let extracted = extract_from_join_filter( + filter.clone(), + &mut column_checker, + &mut left_extractor, + &mut right_extractor, + )?; + if extracted.transformed { + any_extracted = true; + } + Some(extracted.data) + } else { + None + }; + + if !any_extracted { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + // Save original schema before modifying inputs + let original_schema = Arc::clone(&join.schema); + + // Build left extraction projection if needed + let new_left = if left_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&left_target) { + merge_into_extracted_projection(existing_proj, &left_extractor)? + } else { + left_extractor.build_projection_with_all_columns(left_target)? + }; + Arc::new(rebuild_path(left_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.left) + }; + + // Build right extraction projection if needed + let new_right = if right_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&right_target) { + merge_into_extracted_projection(existing_proj, &right_extractor)? + } else { + right_extractor.build_projection_with_all_columns(right_target)? + }; + Arc::new(rebuild_path(right_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.right) + }; + + // Create new Join with updated inputs and expressions + let new_join = datafusion_expr::logical_plan::Join::try_new( + new_left, + new_right, + new_on, + new_filter, + join.join_type, + join.join_constraint, + join.null_equality, + join.null_aware, + )?; + + // Add recovery projection to restore original schema + // This hides the intermediate extracted expression columns + let recovered = build_recover_project_plan(original_schema.as_ref(), LogicalPlan::Join(new_join))?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a join filter expression. +/// +/// For each sub-expression, determines if it references only left, only right, +/// or both columns, and routes extractions accordingly. +fn extract_from_join_filter( + filter: Expr, + column_checker: &mut ColumnChecker, + left_extractor: &mut LeafExpressionExtractor, + right_extractor: &mut LeafExpressionExtractor, +) -> Result<Transformed<Expr>> { + filter.transform_down(|expr| { + // Skip expressions already aliased with extracted expression pattern + if let Expr::Alias(alias) = &expr + && alias.name.starts_with(EXTRACTED_EXPR_PREFIX) + { + return Ok(Transformed { + data: expr, + transformed: false, + tnr: TreeNodeRecursion::Jump, + }); + } + + match expr.placement() { + ExpressionPlacement::MoveTowardsLeafNodes => { + // Check which side this expression belongs to + if column_checker.is_left_only(&expr) { + // Extract to left side + let col_ref = left_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else if column_checker.is_right_only(&expr) { + // Extract to right side + let col_ref = right_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else { + // References both sides - cannot extract, keep in place + // This shouldn't typically happen for MoveTowardsLeafNodes expressions + // but we handle it gracefully + Ok(Transformed::no(expr)) + } + } + ExpressionPlacement::Column => { + // Track columns for pass-through on appropriate side + if let Expr::Column(col) = &expr { + if column_checker.is_left_only(&expr) { + left_extractor.columns_needed.insert(col.clone()); + } else if column_checker.is_right_only(&expr) { + right_extractor.columns_needed.insert(col.clone()); + } + } + Ok(Transformed::no(expr)) + } + _ => { + // Continue recursing into children + Ok(Transformed::no(expr)) + } + } + }) +} + +/// Evaluates the columns referenced in the given expression to see if they refer +/// only to the left or right columns of a join. +struct ColumnChecker<'a> { + left_schema: &'a DFSchema, + left_columns: Option<std::collections::HashSet<Column>>, + right_schema: &'a DFSchema, + right_columns: Option<std::collections::HashSet<Column>>, +} + +impl<'a> ColumnChecker<'a> { + fn new(left_schema: &'a DFSchema, right_schema: &'a DFSchema) -> Self { + Self { + left_schema, + left_columns: None, + right_schema, + right_columns: None, + } + } + + /// Return true if the expression references only columns from the left side + fn is_left_only(&mut self, predicate: &Expr) -> bool { + if self.left_columns.is_none() { + self.left_columns = Some(schema_columns(self.left_schema)); + } + has_all_column_refs(predicate, self.left_columns.as_ref().unwrap()) + } + + /// Return true if the expression references only columns from the right side + fn is_right_only(&mut self, predicate: &Expr) -> bool { + if self.right_columns.is_none() { + self.right_columns = Some(schema_columns(self.right_schema)); + } + has_all_column_refs(predicate, self.right_columns.as_ref().unwrap()) + } +} + +/// Returns all columns in the schema (both qualified and unqualified forms) +fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> { + schema + .iter() + .flat_map(|(qualifier, field)| { + [ + Column::new(qualifier.cloned(), field.name()), + Column::new_unqualified(field.name()), + ] + }) + .collect() +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Aggregate nodes. +/// +/// For Aggregates, we extract from: +/// - Group-by expressions (full expressions or sub-expressions) +/// - Arguments inside aggregate functions (NOT the aggregate function itself) +/// +/// Uses CSE's two-level pattern with NamePreserver for stable name handling. +fn extract_from_aggregate( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Aggregate(agg) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Save original expression names using NamePreserver (like CSE) + let name_preserver = NamePreserver::new_for_projection(); + let saved_group_names: Vec<_> = agg + .group_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + let saved_aggr_names: Vec<_> = agg + .aggr_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + + // Find where to place extractions + let (target, path) = find_extraction_target(&agg.input); + let target_schema = Arc::clone(target.schema()); + + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Extract from group-by expressions + let mut new_group_by = Vec::with_capacity(agg.group_expr.len()); + let mut has_extractions = false; + + for expr in &agg.group_expr { + let transformed = extractor.extract(expr.clone())?; + if transformed.transformed { + has_extractions = true; + } + new_group_by.push(transformed.data); + } + + // Extract from aggregate function arguments (not the function itself) + let mut new_aggr = Vec::with_capacity(agg.aggr_expr.len()); + + for expr in &agg.aggr_expr { + let transformed = extract_from_aggregate_args(expr.clone(), &mut extractor)?; + if transformed.transformed { + has_extractions = true; + } + new_aggr.push(transformed.data); + } + + if !has_extractions { + return Ok(Transformed::no(LogicalPlan::Aggregate(agg))); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild path from target back up + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Restore names in group-by expressions using NamePreserver + let restored_group_expr: Vec<Expr> = new_group_by + .into_iter() + .zip(saved_group_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Restore names in aggregate expressions using NamePreserver + let restored_aggr_expr: Vec<Expr> = new_aggr + .into_iter() + .zip(saved_aggr_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Create new Aggregate with restored names + // (no outer projection needed if names are properly preserved) + let new_agg = datafusion_expr::logical_plan::Aggregate::try_new( + Arc::new(rebuilt_input), + restored_group_expr, + restored_aggr_expr, + )?; + + Ok(Transformed::yes(LogicalPlan::Aggregate(new_agg))) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Projection nodes. +/// +/// Uses CSE's two-level pattern (outer + inner projections only): +/// - Inner projection: extraction with ALL columns passed through +/// - Outer projection: rewritten expressions with restored names +/// +/// This avoids the unstable 3-level structure that gets broken by OptimizeProjections. +fn extract_from_projection( Review Comment: I think this needs substantial improvement ########## datafusion/optimizer/src/extract_leaf_expressions.rs: ########## @@ -0,0 +1,1870 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions into projections. +//! +//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes` computations +//! (like field accessors) live in Projection nodes immediately above scan nodes, making them +//! eligible for pushdown by the `OptimizeProjections` rule. +//! +//! ## Algorithm +//! +//! This rule uses **BottomUp** traversal to push ALL `MoveTowardsLeafNodes` expressions +//! (like `get_field`) to projections immediately above scan nodes. This enables optimal +//! Parquet column pruning. +//! +//! ### Node Classification +//! +//! **Barrier Nodes** (stop pushing, create projection above): +//! - `TableScan` - the leaf, ideal extraction point +//! - `Join` - requires routing to left/right sides +//! - `Aggregate` - changes schema semantics +//! - `SubqueryAlias` - scope boundary +//! - `Union`, `Intersect`, `Except` - schema boundaries +//! +//! **Schema-Preserving Nodes** (push through): +//! - `Filter` - passes all input columns through +//! - `Sort` - passes all input columns through +//! - `Limit` - passes all input columns through +//! - Passthrough `Projection` - only column references +//! +//! ### How It Works +//! +//! 1. Process leaf nodes first (TableScan, etc.) +//! 2. When processing higher nodes, descendants are already finalized +//! 3. Push extractions DOWN through the plan, merging into existing extracted +//! expression projections when possible + +use indexmap::{IndexMap, IndexSet}; +use std::sync::Arc; + +use datafusion_common::alias::AliasGenerator; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Column, DFSchema, Result}; +use datafusion_expr::expr_rewriter::NamePreserver; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection, Sort}; + +use crate::optimizer::ApplyOrder; +use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs, is_extracted_expr_projection}; +use crate::{OptimizerConfig, OptimizerRule}; + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into projections. +/// +/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations (like field +/// accessors) live in Projection nodes, making them eligible for pushdown. +/// +/// # Example +/// +/// Given a filter with a struct field access: +/// +/// ```text +/// Filter: user['status'] = 'active' +/// TableScan: t [user] +/// ``` +/// +/// This rule extracts the field access into a projection: +/// +/// ```text +/// Filter: __datafusion_extracted_1 = 'active' +/// Projection: user['status'] AS __datafusion_extracted_1, user +/// TableScan: t [user] +/// ``` +/// +/// The `OptimizeProjections` rule can then push this projection down to the scan. +/// +/// **Important:** The `PushDownFilter` rule is aware of projections created by this rule +/// and will not push filters through them. See `is_extracted_expr_projection` in utils.rs. +#[derive(Default, Debug)] +pub struct ExtractLeafExpressions {} + +impl ExtractLeafExpressions { + /// Create a new [`ExtractLeafExpressions`] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for ExtractLeafExpressions { + fn name(&self) -> &str { + "extract_leaf_expressions" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>> { + let alias_generator = config.alias_generator(); + extract_from_plan(plan, alias_generator) + } +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node. +/// +/// With BottomUp traversal, we process leaves first, then work up. +/// This allows us to push extractions all the way down to scan nodes. +fn extract_from_plan( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + match &plan { + // Schema-preserving nodes - extract and push down + LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_) => { + extract_from_schema_preserving(plan, alias_generator) + } + + // Schema-transforming nodes need special handling + LogicalPlan::Aggregate(_) => extract_from_aggregate(plan, alias_generator), + LogicalPlan::Projection(_) => extract_from_projection(plan, alias_generator), + LogicalPlan::Join(_) => extract_from_join(plan, alias_generator), + + // Everything else passes through unchanged + _ => Ok(Transformed::no(plan)), + } +} + +/// Extracts from schema-preserving nodes (Filter, Sort, Limit). +/// +/// These nodes don't change the schema, so we can extract expressions +/// and push them down to existing extracted projections or create new ones. +/// +/// Uses CSE's two-level pattern: +/// 1. Inner extraction projection with ALL columns passed through +/// 2. Outer recovery projection to restore original schema +fn extract_from_schema_preserving( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + // Skip nodes with no children + if plan.inputs().is_empty() { + return Ok(Transformed::no(plan)); + } + + let input = plan.inputs()[0].clone(); + let input_schema = Arc::clone(input.schema()); + + // Find where to place extractions (look down through schema-preserving nodes) + let input_arc = Arc::new(input); + let (target, path) = find_extraction_target(&input_arc); + let target_schema = Arc::clone(target.schema()); + + // Extract using target schema - this is where the projection will be placed + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Transform expressions + let transformed = plan.map_expressions(|expr| extractor.extract(expr))?; + + if !extractor.has_extractions() { + return Ok(transformed); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild the path from target back up to our node's input + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Create the node with new input + let new_inputs: Vec<LogicalPlan> = std::iter::once(rebuilt_input) + .chain( + transformed + .data + .inputs() + .iter() + .skip(1) + .map(|p| (*p).clone()), + ) + .collect(); + + let new_plan = transformed + .data + .with_new_exprs(transformed.data.expressions(), new_inputs)?; + + // Use CSE's pattern: add recovery projection to restore original schema + let recovered = build_recover_project_plan(input_schema.as_ref(), new_plan)?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Join nodes. +/// +/// For Joins, we extract from: +/// - `on` expressions: pairs of (left_key, right_key) for equijoin +/// - `filter` expression: non-equi join conditions +/// +/// Each expression is routed to the appropriate side (left or right) based on +/// which columns it references. Expressions referencing columns from both sides +/// cannot have sub-expressions extracted (they must remain in the filter). +fn extract_from_join( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Join(join) = plan else { + return Ok(Transformed::no(plan)); + }; + + let left_schema = join.left.schema(); + let right_schema = join.right.schema(); + + // Create extractors for left and right sides + // Find extraction targets for each side (look through schema-preserving nodes) + let (left_target, left_path) = find_extraction_target(&join.left); + let (right_target, right_path) = find_extraction_target(&join.right); + + let left_target_schema = Arc::clone(left_target.schema()); + let right_target_schema = Arc::clone(right_target.schema()); + + let mut left_extractor = + LeafExpressionExtractor::new(left_target_schema.as_ref(), alias_generator); + let mut right_extractor = + LeafExpressionExtractor::new(right_target_schema.as_ref(), alias_generator); + + // Build column checker to route expressions to correct side + let mut column_checker = ColumnChecker::new(left_schema.as_ref(), right_schema.as_ref()); + + // Extract from `on` expressions (equijoin keys) + let mut new_on = Vec::with_capacity(join.on.len()); + let mut any_extracted = false; + + for (left_key, right_key) in &join.on { + // Left key should reference only left columns + let new_left = left_extractor.extract(left_key.clone())?; + if new_left.transformed { + any_extracted = true; + } + + // Right key should reference only right columns + let new_right = right_extractor.extract(right_key.clone())?; + if new_right.transformed { + any_extracted = true; + } + + new_on.push((new_left.data, new_right.data)); + } + + // Extract from `filter` expression + let new_filter = if let Some(ref filter) = join.filter { + let extracted = extract_from_join_filter( + filter.clone(), + &mut column_checker, + &mut left_extractor, + &mut right_extractor, + )?; + if extracted.transformed { + any_extracted = true; + } + Some(extracted.data) + } else { + None + }; + + if !any_extracted { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + // Save original schema before modifying inputs + let original_schema = Arc::clone(&join.schema); + + // Build left extraction projection if needed + let new_left = if left_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&left_target) { + merge_into_extracted_projection(existing_proj, &left_extractor)? + } else { + left_extractor.build_projection_with_all_columns(left_target)? + }; + Arc::new(rebuild_path(left_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.left) + }; + + // Build right extraction projection if needed + let new_right = if right_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&right_target) { + merge_into_extracted_projection(existing_proj, &right_extractor)? + } else { + right_extractor.build_projection_with_all_columns(right_target)? + }; + Arc::new(rebuild_path(right_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.right) + }; + + // Create new Join with updated inputs and expressions + let new_join = datafusion_expr::logical_plan::Join::try_new( + new_left, + new_right, + new_on, + new_filter, + join.join_type, + join.join_constraint, + join.null_equality, + join.null_aware, + )?; + + // Add recovery projection to restore original schema + // This hides the intermediate extracted expression columns + let recovered = build_recover_project_plan(original_schema.as_ref(), LogicalPlan::Join(new_join))?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a join filter expression. +/// +/// For each sub-expression, determines if it references only left, only right, +/// or both columns, and routes extractions accordingly. +fn extract_from_join_filter( + filter: Expr, + column_checker: &mut ColumnChecker, + left_extractor: &mut LeafExpressionExtractor, + right_extractor: &mut LeafExpressionExtractor, +) -> Result<Transformed<Expr>> { + filter.transform_down(|expr| { + // Skip expressions already aliased with extracted expression pattern + if let Expr::Alias(alias) = &expr + && alias.name.starts_with(EXTRACTED_EXPR_PREFIX) + { + return Ok(Transformed { + data: expr, + transformed: false, + tnr: TreeNodeRecursion::Jump, + }); + } + + match expr.placement() { + ExpressionPlacement::MoveTowardsLeafNodes => { + // Check which side this expression belongs to + if column_checker.is_left_only(&expr) { + // Extract to left side + let col_ref = left_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else if column_checker.is_right_only(&expr) { + // Extract to right side + let col_ref = right_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else { + // References both sides - cannot extract, keep in place + // This shouldn't typically happen for MoveTowardsLeafNodes expressions + // but we handle it gracefully + Ok(Transformed::no(expr)) + } + } + ExpressionPlacement::Column => { + // Track columns for pass-through on appropriate side + if let Expr::Column(col) = &expr { + if column_checker.is_left_only(&expr) { + left_extractor.columns_needed.insert(col.clone()); + } else if column_checker.is_right_only(&expr) { + right_extractor.columns_needed.insert(col.clone()); + } + } + Ok(Transformed::no(expr)) + } + _ => { + // Continue recursing into children + Ok(Transformed::no(expr)) + } + } + }) +} + +/// Evaluates the columns referenced in the given expression to see if they refer +/// only to the left or right columns of a join. +struct ColumnChecker<'a> { + left_schema: &'a DFSchema, + left_columns: Option<std::collections::HashSet<Column>>, + right_schema: &'a DFSchema, + right_columns: Option<std::collections::HashSet<Column>>, +} + +impl<'a> ColumnChecker<'a> { + fn new(left_schema: &'a DFSchema, right_schema: &'a DFSchema) -> Self { + Self { + left_schema, + left_columns: None, + right_schema, + right_columns: None, + } + } + + /// Return true if the expression references only columns from the left side + fn is_left_only(&mut self, predicate: &Expr) -> bool { + if self.left_columns.is_none() { + self.left_columns = Some(schema_columns(self.left_schema)); + } + has_all_column_refs(predicate, self.left_columns.as_ref().unwrap()) + } + + /// Return true if the expression references only columns from the right side + fn is_right_only(&mut self, predicate: &Expr) -> bool { + if self.right_columns.is_none() { + self.right_columns = Some(schema_columns(self.right_schema)); + } + has_all_column_refs(predicate, self.right_columns.as_ref().unwrap()) + } +} + +/// Returns all columns in the schema (both qualified and unqualified forms) +fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> { + schema + .iter() + .flat_map(|(qualifier, field)| { + [ + Column::new(qualifier.cloned(), field.name()), + Column::new_unqualified(field.name()), + ] + }) + .collect() +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Aggregate nodes. +/// +/// For Aggregates, we extract from: +/// - Group-by expressions (full expressions or sub-expressions) +/// - Arguments inside aggregate functions (NOT the aggregate function itself) +/// +/// Uses CSE's two-level pattern with NamePreserver for stable name handling. +fn extract_from_aggregate( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Aggregate(agg) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Save original expression names using NamePreserver (like CSE) + let name_preserver = NamePreserver::new_for_projection(); + let saved_group_names: Vec<_> = agg + .group_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + let saved_aggr_names: Vec<_> = agg + .aggr_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + + // Find where to place extractions + let (target, path) = find_extraction_target(&agg.input); + let target_schema = Arc::clone(target.schema()); + + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Extract from group-by expressions + let mut new_group_by = Vec::with_capacity(agg.group_expr.len()); + let mut has_extractions = false; + + for expr in &agg.group_expr { + let transformed = extractor.extract(expr.clone())?; + if transformed.transformed { + has_extractions = true; + } + new_group_by.push(transformed.data); + } + + // Extract from aggregate function arguments (not the function itself) + let mut new_aggr = Vec::with_capacity(agg.aggr_expr.len()); + + for expr in &agg.aggr_expr { + let transformed = extract_from_aggregate_args(expr.clone(), &mut extractor)?; + if transformed.transformed { + has_extractions = true; + } + new_aggr.push(transformed.data); + } + + if !has_extractions { + return Ok(Transformed::no(LogicalPlan::Aggregate(agg))); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild path from target back up + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Restore names in group-by expressions using NamePreserver + let restored_group_expr: Vec<Expr> = new_group_by + .into_iter() + .zip(saved_group_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Restore names in aggregate expressions using NamePreserver + let restored_aggr_expr: Vec<Expr> = new_aggr + .into_iter() + .zip(saved_aggr_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Create new Aggregate with restored names + // (no outer projection needed if names are properly preserved) + let new_agg = datafusion_expr::logical_plan::Aggregate::try_new( + Arc::new(rebuilt_input), + restored_group_expr, + restored_aggr_expr, + )?; + + Ok(Transformed::yes(LogicalPlan::Aggregate(new_agg))) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Projection nodes. +/// +/// Uses CSE's two-level pattern (outer + inner projections only): +/// - Inner projection: extraction with ALL columns passed through +/// - Outer projection: rewritten expressions with restored names +/// +/// This avoids the unstable 3-level structure that gets broken by OptimizeProjections. +fn extract_from_projection( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Projection(proj) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Skip if this projection is fully extracted (only column references) + if is_fully_extracted(&proj) { + return Ok(Transformed::no(LogicalPlan::Projection(proj))); + } + + // Skip if this is already an extracted expression projection. + // This prevents re-extraction on subsequent optimizer passes. + if is_extracted_expr_projection(&proj) { + return Ok(Transformed::no(LogicalPlan::Projection(proj))); Review Comment: I feel we can probably do better than this. I don't love the idea of identifying an alraedy extracted projection based on every expression having an alias with a certain pattern. I think we should be able to ask "is there anything to extract?" and then "can we push this down?" instead i.e. do the work but make it idempotent (does not change the plan) if there's no change. ########## datafusion/optimizer/src/extract_leaf_expressions.rs: ########## @@ -0,0 +1,1870 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions into projections. +//! +//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes` computations +//! (like field accessors) live in Projection nodes immediately above scan nodes, making them +//! eligible for pushdown by the `OptimizeProjections` rule. +//! +//! ## Algorithm +//! +//! This rule uses **BottomUp** traversal to push ALL `MoveTowardsLeafNodes` expressions +//! (like `get_field`) to projections immediately above scan nodes. This enables optimal +//! Parquet column pruning. +//! +//! ### Node Classification +//! +//! **Barrier Nodes** (stop pushing, create projection above): +//! - `TableScan` - the leaf, ideal extraction point +//! - `Join` - requires routing to left/right sides +//! - `Aggregate` - changes schema semantics +//! - `SubqueryAlias` - scope boundary +//! - `Union`, `Intersect`, `Except` - schema boundaries +//! +//! **Schema-Preserving Nodes** (push through): +//! - `Filter` - passes all input columns through +//! - `Sort` - passes all input columns through +//! - `Limit` - passes all input columns through +//! - Passthrough `Projection` - only column references +//! +//! ### How It Works +//! +//! 1. Process leaf nodes first (TableScan, etc.) +//! 2. When processing higher nodes, descendants are already finalized +//! 3. Push extractions DOWN through the plan, merging into existing extracted +//! expression projections when possible + +use indexmap::{IndexMap, IndexSet}; +use std::sync::Arc; + +use datafusion_common::alias::AliasGenerator; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Column, DFSchema, Result}; +use datafusion_expr::expr_rewriter::NamePreserver; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection, Sort}; + +use crate::optimizer::ApplyOrder; +use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs, is_extracted_expr_projection}; +use crate::{OptimizerConfig, OptimizerRule}; + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into projections. +/// +/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations (like field +/// accessors) live in Projection nodes, making them eligible for pushdown. +/// +/// # Example +/// +/// Given a filter with a struct field access: +/// +/// ```text +/// Filter: user['status'] = 'active' +/// TableScan: t [user] +/// ``` +/// +/// This rule extracts the field access into a projection: +/// +/// ```text +/// Filter: __datafusion_extracted_1 = 'active' +/// Projection: user['status'] AS __datafusion_extracted_1, user +/// TableScan: t [user] +/// ``` +/// +/// The `OptimizeProjections` rule can then push this projection down to the scan. +/// +/// **Important:** The `PushDownFilter` rule is aware of projections created by this rule +/// and will not push filters through them. See `is_extracted_expr_projection` in utils.rs. +#[derive(Default, Debug)] +pub struct ExtractLeafExpressions {} + +impl ExtractLeafExpressions { + /// Create a new [`ExtractLeafExpressions`] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for ExtractLeafExpressions { + fn name(&self) -> &str { + "extract_leaf_expressions" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>> { + let alias_generator = config.alias_generator(); + extract_from_plan(plan, alias_generator) + } +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node. +/// +/// With BottomUp traversal, we process leaves first, then work up. +/// This allows us to push extractions all the way down to scan nodes. +fn extract_from_plan( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + match &plan { + // Schema-preserving nodes - extract and push down + LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_) => { + extract_from_schema_preserving(plan, alias_generator) + } + + // Schema-transforming nodes need special handling + LogicalPlan::Aggregate(_) => extract_from_aggregate(plan, alias_generator), + LogicalPlan::Projection(_) => extract_from_projection(plan, alias_generator), + LogicalPlan::Join(_) => extract_from_join(plan, alias_generator), + + // Everything else passes through unchanged + _ => Ok(Transformed::no(plan)), + } +} + +/// Extracts from schema-preserving nodes (Filter, Sort, Limit). +/// +/// These nodes don't change the schema, so we can extract expressions +/// and push them down to existing extracted projections or create new ones. +/// +/// Uses CSE's two-level pattern: +/// 1. Inner extraction projection with ALL columns passed through +/// 2. Outer recovery projection to restore original schema +fn extract_from_schema_preserving( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + // Skip nodes with no children + if plan.inputs().is_empty() { + return Ok(Transformed::no(plan)); + } + + let input = plan.inputs()[0].clone(); + let input_schema = Arc::clone(input.schema()); + + // Find where to place extractions (look down through schema-preserving nodes) + let input_arc = Arc::new(input); + let (target, path) = find_extraction_target(&input_arc); + let target_schema = Arc::clone(target.schema()); + + // Extract using target schema - this is where the projection will be placed + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Transform expressions + let transformed = plan.map_expressions(|expr| extractor.extract(expr))?; + + if !extractor.has_extractions() { + return Ok(transformed); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild the path from target back up to our node's input + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Create the node with new input + let new_inputs: Vec<LogicalPlan> = std::iter::once(rebuilt_input) + .chain( + transformed + .data + .inputs() + .iter() + .skip(1) + .map(|p| (*p).clone()), + ) + .collect(); + + let new_plan = transformed + .data + .with_new_exprs(transformed.data.expressions(), new_inputs)?; + + // Use CSE's pattern: add recovery projection to restore original schema + let recovered = build_recover_project_plan(input_schema.as_ref(), new_plan)?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Join nodes. +/// +/// For Joins, we extract from: +/// - `on` expressions: pairs of (left_key, right_key) for equijoin +/// - `filter` expression: non-equi join conditions +/// +/// Each expression is routed to the appropriate side (left or right) based on +/// which columns it references. Expressions referencing columns from both sides +/// cannot have sub-expressions extracted (they must remain in the filter). +fn extract_from_join( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Join(join) = plan else { + return Ok(Transformed::no(plan)); + }; + + let left_schema = join.left.schema(); + let right_schema = join.right.schema(); + + // Create extractors for left and right sides + // Find extraction targets for each side (look through schema-preserving nodes) + let (left_target, left_path) = find_extraction_target(&join.left); + let (right_target, right_path) = find_extraction_target(&join.right); + + let left_target_schema = Arc::clone(left_target.schema()); + let right_target_schema = Arc::clone(right_target.schema()); + + let mut left_extractor = + LeafExpressionExtractor::new(left_target_schema.as_ref(), alias_generator); + let mut right_extractor = + LeafExpressionExtractor::new(right_target_schema.as_ref(), alias_generator); + + // Build column checker to route expressions to correct side + let mut column_checker = ColumnChecker::new(left_schema.as_ref(), right_schema.as_ref()); + + // Extract from `on` expressions (equijoin keys) + let mut new_on = Vec::with_capacity(join.on.len()); + let mut any_extracted = false; + + for (left_key, right_key) in &join.on { + // Left key should reference only left columns + let new_left = left_extractor.extract(left_key.clone())?; + if new_left.transformed { + any_extracted = true; + } + + // Right key should reference only right columns + let new_right = right_extractor.extract(right_key.clone())?; + if new_right.transformed { + any_extracted = true; + } + + new_on.push((new_left.data, new_right.data)); + } + + // Extract from `filter` expression + let new_filter = if let Some(ref filter) = join.filter { + let extracted = extract_from_join_filter( + filter.clone(), + &mut column_checker, + &mut left_extractor, + &mut right_extractor, + )?; + if extracted.transformed { + any_extracted = true; + } + Some(extracted.data) + } else { + None + }; + + if !any_extracted { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + // Save original schema before modifying inputs + let original_schema = Arc::clone(&join.schema); + + // Build left extraction projection if needed + let new_left = if left_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&left_target) { + merge_into_extracted_projection(existing_proj, &left_extractor)? + } else { + left_extractor.build_projection_with_all_columns(left_target)? + }; + Arc::new(rebuild_path(left_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.left) + }; + + // Build right extraction projection if needed + let new_right = if right_extractor.has_extractions() { + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&right_target) { + merge_into_extracted_projection(existing_proj, &right_extractor)? + } else { + right_extractor.build_projection_with_all_columns(right_target)? + }; + Arc::new(rebuild_path(right_path, LogicalPlan::Projection(extraction_proj))?) + } else { + Arc::clone(&join.right) + }; + + // Create new Join with updated inputs and expressions + let new_join = datafusion_expr::logical_plan::Join::try_new( + new_left, + new_right, + new_on, + new_filter, + join.join_type, + join.join_constraint, + join.null_equality, + join.null_aware, + )?; + + // Add recovery projection to restore original schema + // This hides the intermediate extracted expression columns + let recovered = build_recover_project_plan(original_schema.as_ref(), LogicalPlan::Join(new_join))?; + + Ok(Transformed::yes(recovered)) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from a join filter expression. +/// +/// For each sub-expression, determines if it references only left, only right, +/// or both columns, and routes extractions accordingly. +fn extract_from_join_filter( + filter: Expr, + column_checker: &mut ColumnChecker, + left_extractor: &mut LeafExpressionExtractor, + right_extractor: &mut LeafExpressionExtractor, +) -> Result<Transformed<Expr>> { + filter.transform_down(|expr| { + // Skip expressions already aliased with extracted expression pattern + if let Expr::Alias(alias) = &expr + && alias.name.starts_with(EXTRACTED_EXPR_PREFIX) + { + return Ok(Transformed { + data: expr, + transformed: false, + tnr: TreeNodeRecursion::Jump, + }); + } + + match expr.placement() { + ExpressionPlacement::MoveTowardsLeafNodes => { + // Check which side this expression belongs to + if column_checker.is_left_only(&expr) { + // Extract to left side + let col_ref = left_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else if column_checker.is_right_only(&expr) { + // Extract to right side + let col_ref = right_extractor.add_extracted(expr)?; + Ok(Transformed::yes(col_ref)) + } else { + // References both sides - cannot extract, keep in place + // This shouldn't typically happen for MoveTowardsLeafNodes expressions + // but we handle it gracefully + Ok(Transformed::no(expr)) + } + } + ExpressionPlacement::Column => { + // Track columns for pass-through on appropriate side + if let Expr::Column(col) = &expr { + if column_checker.is_left_only(&expr) { + left_extractor.columns_needed.insert(col.clone()); + } else if column_checker.is_right_only(&expr) { + right_extractor.columns_needed.insert(col.clone()); + } + } + Ok(Transformed::no(expr)) + } + _ => { + // Continue recursing into children + Ok(Transformed::no(expr)) + } + } + }) +} + +/// Evaluates the columns referenced in the given expression to see if they refer +/// only to the left or right columns of a join. +struct ColumnChecker<'a> { + left_schema: &'a DFSchema, + left_columns: Option<std::collections::HashSet<Column>>, + right_schema: &'a DFSchema, + right_columns: Option<std::collections::HashSet<Column>>, +} + +impl<'a> ColumnChecker<'a> { + fn new(left_schema: &'a DFSchema, right_schema: &'a DFSchema) -> Self { + Self { + left_schema, + left_columns: None, + right_schema, + right_columns: None, + } + } + + /// Return true if the expression references only columns from the left side + fn is_left_only(&mut self, predicate: &Expr) -> bool { + if self.left_columns.is_none() { + self.left_columns = Some(schema_columns(self.left_schema)); + } + has_all_column_refs(predicate, self.left_columns.as_ref().unwrap()) + } + + /// Return true if the expression references only columns from the right side + fn is_right_only(&mut self, predicate: &Expr) -> bool { + if self.right_columns.is_none() { + self.right_columns = Some(schema_columns(self.right_schema)); + } + has_all_column_refs(predicate, self.right_columns.as_ref().unwrap()) + } +} + +/// Returns all columns in the schema (both qualified and unqualified forms) +fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> { + schema + .iter() + .flat_map(|(qualifier, field)| { + [ + Column::new(qualifier.cloned(), field.name()), + Column::new_unqualified(field.name()), + ] + }) + .collect() +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Aggregate nodes. +/// +/// For Aggregates, we extract from: +/// - Group-by expressions (full expressions or sub-expressions) +/// - Arguments inside aggregate functions (NOT the aggregate function itself) +/// +/// Uses CSE's two-level pattern with NamePreserver for stable name handling. +fn extract_from_aggregate( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Aggregate(agg) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Save original expression names using NamePreserver (like CSE) + let name_preserver = NamePreserver::new_for_projection(); + let saved_group_names: Vec<_> = agg + .group_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + let saved_aggr_names: Vec<_> = agg + .aggr_expr + .iter() + .map(|e| name_preserver.save(e)) + .collect(); + + // Find where to place extractions + let (target, path) = find_extraction_target(&agg.input); + let target_schema = Arc::clone(target.schema()); + + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Extract from group-by expressions + let mut new_group_by = Vec::with_capacity(agg.group_expr.len()); + let mut has_extractions = false; + + for expr in &agg.group_expr { + let transformed = extractor.extract(expr.clone())?; + if transformed.transformed { + has_extractions = true; + } + new_group_by.push(transformed.data); + } + + // Extract from aggregate function arguments (not the function itself) + let mut new_aggr = Vec::with_capacity(agg.aggr_expr.len()); + + for expr in &agg.aggr_expr { + let transformed = extract_from_aggregate_args(expr.clone(), &mut extractor)?; + if transformed.transformed { + has_extractions = true; + } + new_aggr.push(transformed.data); + } + + if !has_extractions { + return Ok(Transformed::no(LogicalPlan::Aggregate(agg))); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild path from target back up + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Restore names in group-by expressions using NamePreserver + let restored_group_expr: Vec<Expr> = new_group_by + .into_iter() + .zip(saved_group_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Restore names in aggregate expressions using NamePreserver + let restored_aggr_expr: Vec<Expr> = new_aggr + .into_iter() + .zip(saved_aggr_names) + .map(|(expr, saved)| saved.restore(expr)) + .collect(); + + // Create new Aggregate with restored names + // (no outer projection needed if names are properly preserved) + let new_agg = datafusion_expr::logical_plan::Aggregate::try_new( + Arc::new(rebuilt_input), + restored_group_expr, + restored_aggr_expr, + )?; + + Ok(Transformed::yes(LogicalPlan::Aggregate(new_agg))) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from Projection nodes. +/// +/// Uses CSE's two-level pattern (outer + inner projections only): +/// - Inner projection: extraction with ALL columns passed through +/// - Outer projection: rewritten expressions with restored names +/// +/// This avoids the unstable 3-level structure that gets broken by OptimizeProjections. +fn extract_from_projection( + plan: LogicalPlan, + alias_generator: &Arc<AliasGenerator>, +) -> Result<Transformed<LogicalPlan>> { + let LogicalPlan::Projection(proj) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Skip if this projection is fully extracted (only column references) + if is_fully_extracted(&proj) { + return Ok(Transformed::no(LogicalPlan::Projection(proj))); + } + + // Skip if this is already an extracted expression projection. + // This prevents re-extraction on subsequent optimizer passes. + if is_extracted_expr_projection(&proj) { + return Ok(Transformed::no(LogicalPlan::Projection(proj))); + } + + // Save original expression names using NamePreserver (like CSE) + let name_preserver = NamePreserver::new_for_projection(); + let saved_names: Vec<_> = proj.expr.iter().map(|e| name_preserver.save(e)).collect(); + + // Find where to place extractions (look down through schema-preserving nodes) + let (target, path) = find_extraction_target(&proj.input); + let target_schema = Arc::clone(target.schema()); + + let mut extractor = + LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator); + + // Extract from projection expressions + let mut new_exprs = Vec::with_capacity(proj.expr.len()); + let mut has_extractions = false; + + for expr in &proj.expr { + let transformed = extractor.extract(expr.clone())?; + if transformed.transformed { + has_extractions = true; + } + new_exprs.push(transformed.data); + } + + if !has_extractions { + return Ok(Transformed::no(LogicalPlan::Projection(proj))); + } + + // Build extraction projection with ALL columns (CSE-style) + let extraction_proj = if let Some(existing_proj) = get_extracted_projection(&target) { + merge_into_extracted_projection(existing_proj, &extractor)? + } else { + extractor.build_projection_with_all_columns(target)? + }; + + // Rebuild path from target back up + let rebuilt_input = rebuild_path(path, LogicalPlan::Projection(extraction_proj))?; + + // Create outer projection with rewritten exprs + restored names + let final_exprs: Vec<Expr> = new_exprs + .into_iter() + .zip(saved_names) + .map(|(expr, saved_name)| saved_name.restore(expr)) + .collect(); + + let outer_projection = Projection::try_new(final_exprs, Arc::new(rebuilt_input))?; + + Ok(Transformed::yes(LogicalPlan::Projection(outer_projection))) +} + +/// Extracts `MoveTowardsLeafNodes` sub-expressions from aggregate function arguments. +/// +/// This extracts from inside the aggregate (e.g., from `sum(get_field(x, 'y'))` +/// we extract `get_field(x, 'y')`), but NOT the aggregate function itself. +fn extract_from_aggregate_args( + expr: Expr, + extractor: &mut LeafExpressionExtractor, +) -> Result<Transformed<Expr>> { + match expr { + Expr::AggregateFunction(mut agg_func) => { + // Extract from arguments, not the function itself + let mut any_changed = false; + let mut new_args = Vec::with_capacity(agg_func.params.args.len()); + + for arg in agg_func.params.args { + let transformed = extractor.extract(arg)?; + if transformed.transformed { + any_changed = true; + } + new_args.push(transformed.data); + } + + if any_changed { + agg_func.params.args = new_args; + Ok(Transformed::yes(Expr::AggregateFunction(agg_func))) + } else { + agg_func.params.args = new_args; + Ok(Transformed::no(Expr::AggregateFunction(agg_func))) + } + } + // For aliased aggregates, process the inner expression + Expr::Alias(alias) => { + let transformed = extract_from_aggregate_args(*alias.expr, extractor)?; + Ok( + transformed + .update_data(|e| e.alias_qualified(alias.relation, alias.name)), + ) + } + // For other expressions, use regular extraction + other => extractor.extract(other), + } +} + +// ============================================================================= +// Helper Functions for BottomUp Traversal +// ============================================================================= + +/// Traverses down through schema-preserving nodes to find where to place extractions. +/// +/// Returns (target_node, path_to_rebuild) where: +/// - target_node: the node above which to create extraction projection +/// - path_to_rebuild: nodes between our input and target that must be rebuilt +/// +/// Schema-preserving nodes that we can look through: +/// - Filter, Sort, Limit: pass all input columns through unchanged +/// - Passthrough projections: only column references +/// +/// Barrier nodes where we stop: +/// - TableScan, Join, Aggregate: these are extraction targets +/// - Existing extracted expression projections: we merge into these +/// - Any other node type +fn find_extraction_target( + input: &Arc<LogicalPlan>, +) -> (Arc<LogicalPlan>, Vec<LogicalPlan>) { + let mut current = Arc::clone(input); + let mut path = vec![]; + + loop { + match current.as_ref() { + // Look through schema-preserving nodes + LogicalPlan::Filter(f) => { + path.push(current.as_ref().clone()); + current = Arc::clone(&f.input); + } + LogicalPlan::Sort(s) => { + path.push(current.as_ref().clone()); + current = Arc::clone(&s.input); + } + LogicalPlan::Limit(l) => { + path.push(current.as_ref().clone()); + current = Arc::clone(&l.input); + } + // Look through passthrough projections (only column references) + LogicalPlan::Projection(p) if is_passthrough_projection(p) => { + path.push(current.as_ref().clone()); + current = Arc::clone(&p.input); + } + // Found existing extracted expression projection - will merge into it + LogicalPlan::Projection(p) if is_extracted_expr_projection(p) => { + return (current, path); + } + // Hit a barrier node - create new projection here + _ => { + return (current, path); + } + } + } +} + +/// Returns true if the projection is a passthrough (only column references). +fn is_passthrough_projection(proj: &Projection) -> bool { + proj.expr.iter().all(|e| matches!(e, Expr::Column(_))) +} + +/// Returns true if the projection only has column references (nothing to extract). +fn is_fully_extracted(proj: &Projection) -> bool { + proj.expr.iter().all(|e| { + matches!(e, Expr::Column(_)) + || matches!(e, Expr::Alias(a) if matches!(a.expr.as_ref(), Expr::Column(_))) + }) +} + +/// If the target is an extracted expression projection, return it for merging. +fn get_extracted_projection(target: &Arc<LogicalPlan>) -> Option<&Projection> { + if let LogicalPlan::Projection(p) = target.as_ref() + && is_extracted_expr_projection(p) + { + return Some(p); + } + None +} + +/// Merges new extractions into an existing extracted expression projection. +fn merge_into_extracted_projection( + existing: &Projection, + extractor: &LeafExpressionExtractor, +) -> Result<Projection> { + let mut proj_exprs = existing.expr.clone(); + + // Build a map of existing expressions (by schema_name) to their aliases + let existing_extractions: IndexMap<String, String> = existing + .expr + .iter() + .filter_map(|e| { + if let Expr::Alias(alias) = e + && alias.name.starts_with(EXTRACTED_EXPR_PREFIX) + { + let schema_name = alias.expr.schema_name().to_string(); + return Some((schema_name, alias.name.clone())); + } + None + }) + .collect(); + + // Add new extracted expressions, but only if not already present + for (schema_name, (expr, alias)) in &extractor.extracted { + if !existing_extractions.contains_key(schema_name) { + proj_exprs.push(expr.clone().alias(alias)); + } + } + + // Add any new pass-through columns that aren't already in the projection + let existing_cols: IndexSet<Column> = existing + .expr + .iter() + .filter_map(|e| { + if let Expr::Column(c) = e { + Some(c.clone()) + } else { + None + } + }) + .collect(); + + for col in &extractor.columns_needed { + if !existing_cols.contains(col) && extractor.input_schema.has_column(col) { + proj_exprs.push(Expr::Column(col.clone())); + } + } + + Projection::try_new(proj_exprs, Arc::clone(&existing.input)) +} + +/// Rebuilds the path from extraction projection back up to original input. +/// +/// Takes a list of nodes (in top-to-bottom order from input towards target) +/// and rebuilds them with the new bottom input. +/// +/// For passthrough projections, we update them to include ALL columns from +/// the new input (including any new extracted expression columns that were merged). +fn rebuild_path(path: Vec<LogicalPlan>, new_bottom: LogicalPlan) -> Result<LogicalPlan> { + let mut current = new_bottom; + + // Rebuild path from bottom to top (reverse order) + for node in path.into_iter().rev() { + current = match node { + LogicalPlan::Filter(f) => { + LogicalPlan::Filter(Filter::try_new(f.predicate, Arc::new(current))?) + } + LogicalPlan::Sort(s) => LogicalPlan::Sort(Sort { + expr: s.expr, + input: Arc::new(current), + fetch: s.fetch, + }), + LogicalPlan::Limit(l) => LogicalPlan::Limit(Limit { + skip: l.skip, + fetch: l.fetch, + input: Arc::new(current), + }), + LogicalPlan::Projection(p) if is_passthrough_projection(&p) => { Review Comment: We should be able to handle non passthrough projections -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
