Copilot commented on code in PR #20117:
URL: https://github.com/apache/datafusion/pull/20117#discussion_r2756235399
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1291,10 +1291,38 @@ impl OptimizerRule for PushDownFilter {
/// Filter(foo=5)
/// ...
/// ```
+/// Check if a projection is a `__leaf_*` extraction projection
+/// (created by ExtractLeafExpressions).
+///
+/// These projections should not have filters pushed through them because
doing so
+/// would rewrite the filter expressions back to their original form (e.g.,
rewriting
+/// `__leaf_1 > 150` back to `get_field(s,'value') > 150`), which undoes the
extraction
+/// and prevents proper pushdown of field access expressions.
+fn is_leaf_extraction_projection(proj: &Projection) -> bool {
+ proj.expr.iter().any(|e| {
+ if let Expr::Alias(alias) = e {
+ alias.name.starts_with("__leaf")
Review Comment:
The magic string `__leaf` is duplicated in multiple locations throughout the
codebase. Consider defining it as a constant (e.g., `const LEAF_ALIAS_PREFIX:
&str = \"__leaf\";`) to ensure consistency and make future changes easier.
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1059 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
+//!
+//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes`
computations
+//! (like field accessors) live in Projection nodes immediately above scan
nodes, making them
+//! eligible for pushdown by the `OptimizeProjections` rule.
+//!
+//! ## Algorithm
+//!
+//! This rule uses **BottomUp** traversal to push ALL `MoveTowardsLeafNodes`
expressions
+//! (like `get_field`) to projections immediately above scan nodes. This
enables optimal
+//! Parquet column pruning.
+//!
+//! ### Node Classification
+//!
+//! **Barrier Nodes** (stop pushing, create projection above):
+//! - `TableScan` - the leaf, ideal extraction point
+//! - `Join` - requires routing to left/right sides
+//! - `Aggregate` - changes schema semantics
+//! - `SubqueryAlias` - scope boundary
+//! - `Union`, `Intersect`, `Except` - schema boundaries
+//!
+//! **Schema-Preserving Nodes** (push through):
+//! - `Filter` - passes all input columns through
+//! - `Sort` - passes all input columns through
+//! - `Limit` - passes all input columns through
+//! - Passthrough `Projection` - only column references
+//!
+//! ### How It Works
+//!
+//! 1. Process leaf nodes first (TableScan, etc.)
+//! 2. When processing higher nodes, descendants are already finalized
+//! 3. Push extractions DOWN through the plan, merging into existing `__leaf_*`
+//! projections when possible
+
+use indexmap::{IndexMap, IndexSet};
+use std::sync::Arc;
+
+use datafusion_common::alias::AliasGenerator;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::expr_rewriter::NamePreserver;
+use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection,
Sort};
+
+use crate::optimizer::ApplyOrder;
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into
projections.
+///
+/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations
(like field
+/// accessors) live in Projection nodes, making them eligible for pushdown.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+/// TableScan: t [user]
+/// ```
+///
+/// This rule extracts the field access into a projection:
+///
+/// ```text
+/// Filter: __leaf_1 = 'active'
+/// Projection: user['status'] AS __leaf_1, user
+/// TableScan: t [user]
+/// ```
+///
+/// The `OptimizeProjections` rule can then push this projection down to the
scan.
+#[derive(Default, Debug)]
+pub struct ExtractLeafExpressions {}
+
+impl ExtractLeafExpressions {
+ /// Create a new [`ExtractLeafExpressions`]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl OptimizerRule for ExtractLeafExpressions {
+ fn name(&self) -> &str {
+ "extract_leaf_expressions"
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::BottomUp)
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
+ let alias_generator = config.alias_generator();
+ extract_from_plan(plan, alias_generator)
+ }
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
+///
+/// With BottomUp traversal, we process leaves first, then work up.
+/// This allows us to push extractions all the way down to scan nodes.
+fn extract_from_plan(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ match &plan {
+ // Schema-preserving nodes - extract and push down
+ LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_)
=> {
+ extract_from_schema_preserving(plan, alias_generator)
+ }
+
+ // Schema-transforming nodes need special handling
+ LogicalPlan::Aggregate(_) => extract_from_aggregate(plan,
alias_generator),
+ LogicalPlan::Projection(_) => extract_from_projection(plan,
alias_generator),
+
+ // Everything else passes through unchanged
+ _ => Ok(Transformed::no(plan)),
+ }
+}
+
+/// Extracts from schema-preserving nodes (Filter, Sort, Limit).
+///
+/// These nodes don't change the schema, so we can extract expressions
+/// and push them down to existing leaf projections or create new ones.
+///
+/// Uses CSE's two-level pattern:
+/// 1. Inner extraction projection with ALL columns passed through
+/// 2. Outer recovery projection to restore original schema
+fn extract_from_schema_preserving(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ // Skip nodes with no children
+ if plan.inputs().is_empty() {
+ return Ok(Transformed::no(plan));
+ }
+
+ let input = plan.inputs()[0].clone();
+ let input_schema = Arc::clone(input.schema());
+
+ // Find where to place extractions (look down through schema-preserving
nodes)
+ let input_arc = Arc::new(input);
+ let (target, path) = find_extraction_target(&input_arc);
+ let target_schema = Arc::clone(target.schema());
+
+ // Extract using target schema - this is where the projection will be
placed
+ let mut extractor =
+ LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator);
+
+ // Transform expressions
+ let transformed = plan.map_expressions(|expr| extractor.extract(expr))?;
+
+ if !extractor.has_extractions() {
+ return Ok(transformed);
+ }
+
+ // Build extraction projection with ALL columns (CSE-style)
+ let extraction_proj = if let Some(existing_proj) =
get_leaf_projection(&target) {
+ merge_into_leaf_projection(existing_proj, &extractor)?
+ } else {
+ extractor.build_projection_with_all_columns(target)?
+ };
+
+ // Rebuild the path from target back up to our node's input
+ let rebuilt_input = rebuild_path(path,
LogicalPlan::Projection(extraction_proj))?;
+
+ // Create the node with new input
+ let new_inputs: Vec<LogicalPlan> = std::iter::once(rebuilt_input)
+ .chain(
+ transformed
+ .data
+ .inputs()
+ .iter()
+ .skip(1)
+ .map(|p| (*p).clone()),
+ )
+ .collect();
+
+ let new_plan = transformed
+ .data
+ .with_new_exprs(transformed.data.expressions(), new_inputs)?;
+
+ // Use CSE's pattern: add recovery projection to restore original schema
+ let recovered = build_recover_project_plan(input_schema.as_ref(),
new_plan)?;
+
+ Ok(Transformed::yes(recovered))
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from Aggregate nodes.
+///
+/// For Aggregates, we extract from:
+/// - Group-by expressions (full expressions or sub-expressions)
+/// - Arguments inside aggregate functions (NOT the aggregate function itself)
+///
+/// Uses CSE's two-level pattern with NamePreserver for stable name handling.
+fn extract_from_aggregate(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ let LogicalPlan::Aggregate(agg) = plan else {
+ return Ok(Transformed::no(plan));
+ };
+
+ // Save original expression names using NamePreserver (like CSE)
+ let name_preserver = NamePreserver::new_for_projection();
+ let saved_group_names: Vec<_> = agg
+ .group_expr
+ .iter()
+ .map(|e| name_preserver.save(e))
+ .collect();
+ let saved_aggr_names: Vec<_> = agg
+ .aggr_expr
+ .iter()
+ .map(|e| name_preserver.save(e))
+ .collect();
+
+ // Find where to place extractions
+ let (target, path) = find_extraction_target(&agg.input);
+ let target_schema = Arc::clone(target.schema());
+
+ let mut extractor =
+ LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator);
+
+ // Extract from group-by expressions
+ let mut new_group_by = Vec::with_capacity(agg.group_expr.len());
+ let mut has_extractions = false;
+
+ for expr in &agg.group_expr {
+ let transformed = extractor.extract(expr.clone())?;
+ if transformed.transformed {
+ has_extractions = true;
+ }
+ new_group_by.push(transformed.data);
+ }
+
+ // Extract from aggregate function arguments (not the function itself)
+ let mut new_aggr = Vec::with_capacity(agg.aggr_expr.len());
+
+ for expr in &agg.aggr_expr {
+ let transformed = extract_from_aggregate_args(expr.clone(), &mut
extractor)?;
+ if transformed.transformed {
+ has_extractions = true;
+ }
+ new_aggr.push(transformed.data);
+ }
+
+ if !has_extractions {
+ return Ok(Transformed::no(LogicalPlan::Aggregate(agg)));
+ }
+
+ // Build extraction projection with ALL columns (CSE-style)
+ let extraction_proj = if let Some(existing_proj) =
get_leaf_projection(&target) {
+ merge_into_leaf_projection(existing_proj, &extractor)?
+ } else {
+ extractor.build_projection_with_all_columns(target)?
+ };
+
+ // Rebuild path from target back up
+ let rebuilt_input = rebuild_path(path,
LogicalPlan::Projection(extraction_proj))?;
+
+ // Restore names in group-by expressions using NamePreserver
+ let restored_group_expr: Vec<Expr> = new_group_by
+ .into_iter()
+ .zip(saved_group_names)
+ .map(|(expr, saved)| saved.restore(expr))
+ .collect();
+
+ // Restore names in aggregate expressions using NamePreserver
+ let restored_aggr_expr: Vec<Expr> = new_aggr
+ .into_iter()
+ .zip(saved_aggr_names)
+ .map(|(expr, saved)| saved.restore(expr))
+ .collect();
+
+ // Create new Aggregate with restored names
+ // (no outer projection needed if names are properly preserved)
+ let new_agg = datafusion_expr::logical_plan::Aggregate::try_new(
+ Arc::new(rebuilt_input),
+ restored_group_expr,
+ restored_aggr_expr,
+ )?;
+
+ Ok(Transformed::yes(LogicalPlan::Aggregate(new_agg)))
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from Projection nodes.
+///
+/// Uses CSE's two-level pattern (outer + inner projections only):
+/// - Inner projection: extraction with ALL columns passed through
+/// - Outer projection: rewritten expressions with restored names
+///
+/// This avoids the unstable 3-level structure that gets broken by
OptimizeProjections.
+fn extract_from_projection(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ let LogicalPlan::Projection(proj) = plan else {
+ return Ok(Transformed::no(plan));
+ };
+
+ // Skip if this projection is fully extracted (only column references)
+ if is_fully_extracted(&proj) {
+ return Ok(Transformed::no(LogicalPlan::Projection(proj)));
+ }
+
+ // Skip if this is already a leaf projection (contains __leaf_* aliases).
+ // This prevents re-extraction on subsequent optimizer passes.
+ if is_leaf_projection(&proj) {
+ return Ok(Transformed::no(LogicalPlan::Projection(proj)));
+ }
+
+ // Save original expression names using NamePreserver (like CSE)
+ let name_preserver = NamePreserver::new_for_projection();
+ let saved_names: Vec<_> = proj.expr.iter().map(|e|
name_preserver.save(e)).collect();
+
+ // Find where to place extractions (look down through schema-preserving
nodes)
+ let (target, path) = find_extraction_target(&proj.input);
+ let target_schema = Arc::clone(target.schema());
+
+ let mut extractor =
+ LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator);
+
+ // Extract from projection expressions
+ let mut new_exprs = Vec::with_capacity(proj.expr.len());
+ let mut has_extractions = false;
+
+ for expr in &proj.expr {
+ let transformed = extractor.extract(expr.clone())?;
+ if transformed.transformed {
+ has_extractions = true;
+ }
+ new_exprs.push(transformed.data);
+ }
+
+ if !has_extractions {
+ return Ok(Transformed::no(LogicalPlan::Projection(proj)));
+ }
+
+ // Build extraction projection with ALL columns (CSE-style)
+ let extraction_proj = if let Some(existing_proj) =
get_leaf_projection(&target) {
+ merge_into_leaf_projection(existing_proj, &extractor)?
+ } else {
+ extractor.build_projection_with_all_columns(target)?
+ };
+
+ // Rebuild path from target back up
+ let rebuilt_input = rebuild_path(path,
LogicalPlan::Projection(extraction_proj))?;
+
+ // Create outer projection with rewritten exprs + restored names
+ let final_exprs: Vec<Expr> = new_exprs
+ .into_iter()
+ .zip(saved_names)
+ .map(|(expr, saved_name)| saved_name.restore(expr))
+ .collect();
+
+ let outer_projection = Projection::try_new(final_exprs,
Arc::new(rebuilt_input))?;
+
+ Ok(Transformed::yes(LogicalPlan::Projection(outer_projection)))
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from aggregate function
arguments.
+///
+/// This extracts from inside the aggregate (e.g., from `sum(get_field(x,
'y'))`
+/// we extract `get_field(x, 'y')`), but NOT the aggregate function itself.
+fn extract_from_aggregate_args(
+ expr: Expr,
+ extractor: &mut LeafExpressionExtractor,
+) -> Result<Transformed<Expr>> {
+ match expr {
+ Expr::AggregateFunction(mut agg_func) => {
+ // Extract from arguments, not the function itself
+ let mut any_changed = false;
+ let mut new_args = Vec::with_capacity(agg_func.params.args.len());
+
+ for arg in agg_func.params.args {
+ let transformed = extractor.extract(arg)?;
+ if transformed.transformed {
+ any_changed = true;
+ }
+ new_args.push(transformed.data);
+ }
+
+ if any_changed {
+ agg_func.params.args = new_args;
+ Ok(Transformed::yes(Expr::AggregateFunction(agg_func)))
+ } else {
+ agg_func.params.args = new_args;
+ Ok(Transformed::no(Expr::AggregateFunction(agg_func)))
+ }
+ }
+ // For aliased aggregates, process the inner expression
+ Expr::Alias(alias) => {
+ let transformed = extract_from_aggregate_args(*alias.expr,
extractor)?;
+ Ok(
+ transformed
+ .update_data(|e| e.alias_qualified(alias.relation,
alias.name)),
+ )
+ }
+ // For other expressions, use regular extraction
+ other => extractor.extract(other),
+ }
+}
+
+//
=============================================================================
+// Helper Functions for BottomUp Traversal
+//
=============================================================================
+
+/// Traverses down through schema-preserving nodes to find where to place
extractions.
+///
+/// Returns (target_node, path_to_rebuild) where:
+/// - target_node: the node above which to create extraction projection
+/// - path_to_rebuild: nodes between our input and target that must be rebuilt
+///
+/// Schema-preserving nodes that we can look through:
+/// - Filter, Sort, Limit: pass all input columns through unchanged
+/// - Passthrough projections: only column references
+///
+/// Barrier nodes where we stop:
+/// - TableScan, Join, Aggregate: these are extraction targets
+/// - Existing __leaf_* projections: we merge into these
+/// - Any other node type
+fn find_extraction_target(
+ input: &Arc<LogicalPlan>,
+) -> (Arc<LogicalPlan>, Vec<LogicalPlan>) {
+ let mut current = Arc::clone(input);
+ let mut path = vec![];
+
+ loop {
+ match current.as_ref() {
+ // Look through schema-preserving nodes
+ LogicalPlan::Filter(f) => {
+ path.push(current.as_ref().clone());
+ current = Arc::clone(&f.input);
+ }
+ LogicalPlan::Sort(s) => {
+ path.push(current.as_ref().clone());
+ current = Arc::clone(&s.input);
+ }
+ LogicalPlan::Limit(l) => {
+ path.push(current.as_ref().clone());
+ current = Arc::clone(&l.input);
+ }
+ // Look through passthrough projections (only column references)
+ LogicalPlan::Projection(p) if is_passthrough_projection(p) => {
+ path.push(current.as_ref().clone());
+ current = Arc::clone(&p.input);
+ }
+ // Found existing __leaf_* projection - will merge into it
+ LogicalPlan::Projection(p) if is_leaf_projection(p) => {
+ return (current, path);
+ }
+ // Hit a barrier node - create new projection here
+ _ => {
+ return (current, path);
+ }
+ }
+ }
+}
+
+/// Returns true if the projection contains `__leaf_*` expressions (created by
us).
+fn is_leaf_projection(proj: &Projection) -> bool {
+ proj.expr.iter().any(|e| {
+ if let Expr::Alias(alias) = e {
+ alias.name.starts_with("__leaf")
+ } else {
+ false
+ }
+ })
+}
+
+/// Returns true if the projection is a passthrough (only column references).
+fn is_passthrough_projection(proj: &Projection) -> bool {
+ proj.expr.iter().all(|e| matches!(e, Expr::Column(_)))
+}
+
+/// Returns true if the projection only has column references (nothing to
extract).
+fn is_fully_extracted(proj: &Projection) -> bool {
+ proj.expr.iter().all(|e| {
+ matches!(e, Expr::Column(_))
+ || matches!(e, Expr::Alias(a) if matches!(a.expr.as_ref(),
Expr::Column(_)))
+ })
+}
+
+/// If the target is a leaf projection, return it for merging.
+fn get_leaf_projection(target: &Arc<LogicalPlan>) -> Option<&Projection> {
+ if let LogicalPlan::Projection(p) = target.as_ref()
+ && is_leaf_projection(p)
+ {
+ return Some(p);
+ }
+ None
+}
+
+/// Merges new extractions into an existing __leaf_* projection.
+fn merge_into_leaf_projection(
+ existing: &Projection,
+ extractor: &LeafExpressionExtractor,
+) -> Result<Projection> {
+ let mut proj_exprs = existing.expr.clone();
+
+ // Build a map of existing expressions (by schema_name) to their aliases
+ let existing_extractions: IndexMap<String, String> = existing
+ .expr
+ .iter()
+ .filter_map(|e| {
+ if let Expr::Alias(alias) = e
+ && alias.name.starts_with("__leaf")
+ {
+ let schema_name = alias.expr.schema_name().to_string();
+ return Some((schema_name, alias.name.clone()));
+ }
+ None
+ })
+ .collect();
+
+ // Add new extracted expressions, but only if not already present
+ for (schema_name, (expr, alias)) in &extractor.extracted {
+ if !existing_extractions.contains_key(schema_name) {
+ proj_exprs.push(expr.clone().alias(alias));
+ }
+ }
+
+ // Add any new pass-through columns that aren't already in the projection
+ let existing_cols: IndexSet<Column> = existing
+ .expr
+ .iter()
+ .filter_map(|e| {
+ if let Expr::Column(c) = e {
+ Some(c.clone())
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ for col in &extractor.columns_needed {
+ if !existing_cols.contains(col) &&
extractor.input_schema.has_column(col) {
+ proj_exprs.push(Expr::Column(col.clone()));
+ }
+ }
+
+ Projection::try_new(proj_exprs, Arc::clone(&existing.input))
+}
+
+/// Rebuilds the path from extraction projection back up to original input.
+///
+/// Takes a list of nodes (in top-to-bottom order from input towards target)
+/// and rebuilds them with the new bottom input.
+///
+/// For passthrough projections, we update them to include ALL columns from
+/// the new input (including any new `__leaf_*` columns that were merged).
+fn rebuild_path(path: Vec<LogicalPlan>, new_bottom: LogicalPlan) ->
Result<LogicalPlan> {
+ let mut current = new_bottom;
+
+ // Rebuild path from bottom to top (reverse order)
+ for node in path.into_iter().rev() {
+ current = match node {
+ LogicalPlan::Filter(f) => {
+ LogicalPlan::Filter(Filter::try_new(f.predicate,
Arc::new(current))?)
+ }
+ LogicalPlan::Sort(s) => LogicalPlan::Sort(Sort {
+ expr: s.expr,
+ input: Arc::new(current),
+ fetch: s.fetch,
+ }),
+ LogicalPlan::Limit(l) => LogicalPlan::Limit(Limit {
+ skip: l.skip,
+ fetch: l.fetch,
+ input: Arc::new(current),
+ }),
+ LogicalPlan::Projection(p) if is_passthrough_projection(&p) => {
+ // For passthrough projections, include ALL columns from new
input
+ // This ensures new __leaf_* columns flow through
+ let new_exprs: Vec<Expr> = current
+ .schema()
+ .columns()
+ .into_iter()
+ .map(Expr::Column)
+ .collect();
+ LogicalPlan::Projection(Projection::try_new(
+ new_exprs,
+ Arc::new(current),
+ )?)
+ }
+ LogicalPlan::Projection(p) => {
+ LogicalPlan::Projection(Projection::try_new(p.expr,
Arc::new(current))?)
+ }
+ // Should not happen based on find_extraction_target, but handle
gracefully
+ other => other.with_new_exprs(other.expressions(), vec![current])?,
+ };
+ }
+
+ Ok(current)
+}
+
+/// Build projection to restore original schema (like CSE's
build_recover_project_plan).
+///
+/// This adds a projection that selects only the columns from the original
schema,
+/// hiding any intermediate `__leaf_*` columns that were added during
extraction.
+fn build_recover_project_plan(
+ schema: &DFSchema,
+ input: LogicalPlan,
+) -> Result<LogicalPlan> {
+ let col_exprs: Vec<Expr> = schema.iter().map(Expr::from).collect();
+ let projection = Projection::try_new(col_exprs, Arc::new(input))?;
+ Ok(LogicalPlan::Projection(projection))
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from larger expressions.
+struct LeafExpressionExtractor<'a> {
+ /// Extracted expressions: maps schema_name -> (original_expr, alias)
+ extracted: IndexMap<String, (Expr, String)>,
+ /// Columns needed for pass-through
+ columns_needed: IndexSet<Column>,
+ /// Input schema
+ input_schema: &'a DFSchema,
+ /// Alias generator
+ alias_generator: &'a Arc<AliasGenerator>,
+}
+
+impl<'a> LeafExpressionExtractor<'a> {
+ fn new(input_schema: &'a DFSchema, alias_generator: &'a
Arc<AliasGenerator>) -> Self {
+ Self {
+ extracted: IndexMap::new(),
+ columns_needed: IndexSet::new(),
+ input_schema,
+ alias_generator,
+ }
+ }
+
+ /// Extracts `MoveTowardsLeafNodes` sub-expressions, returning rewritten
expression.
+ fn extract(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
+ // Walk top-down to find MoveTowardsLeafNodes sub-expressions
+ expr.transform_down(|e| {
+ // Skip expressions already aliased with __leaf_* pattern.
+ // These were created by a previous extraction pass and should not
be
+ // extracted again. Use TreeNodeRecursion::Jump to skip children.
+ if let Expr::Alias(alias) = &e
+ && alias.name.starts_with("__leaf")
+ {
+ return Ok(Transformed {
+ data: e,
+ transformed: false,
+ tnr: TreeNodeRecursion::Jump,
+ });
+ }
+
+ match e.placement() {
+ ExpressionPlacement::MoveTowardsLeafNodes => {
+ // Extract this entire sub-tree
+ let col_ref = self.add_extracted(e)?;
+ Ok(Transformed::yes(col_ref))
+ }
+ ExpressionPlacement::Column => {
+ // Track columns for pass-through
+ if let Expr::Column(col) = &e {
+ self.columns_needed.insert(col.clone());
+ }
+ Ok(Transformed::no(e))
+ }
+ _ => {
+ // Continue recursing into children
+ Ok(Transformed::no(e))
+ }
+ }
+ })
+ }
+
+ /// Adds an expression to extracted set, returns column reference.
+ fn add_extracted(&mut self, expr: Expr) -> Result<Expr> {
+ let schema_name = expr.schema_name().to_string();
+
+ // Deduplication: reuse existing alias if same expression
+ if let Some((_, alias)) = self.extracted.get(&schema_name) {
+ return Ok(Expr::Column(Column::new_unqualified(alias)));
+ }
+
+ // Track columns referenced by this expression
+ for col in expr.column_refs() {
+ self.columns_needed.insert(col.clone());
+ }
+
+ // Generate unique alias
+ let alias = self.alias_generator.next("__leaf");
Review Comment:
The magic string `__leaf` is used here and in multiple other places.
Consider defining it as a module-level or crate-level constant to centralize
this value and make it easier to change if needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]