alamb commented on code in PR #20117:
URL: https://github.com/apache/datafusion/pull/20117#discussion_r2760375603
##########
datafusion/expr/src/expr.rs:
##########
Review Comment:
I really like the name `ExpressionPlacement` 👍
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -320,7 +323,7 @@ EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct ORDER
BY id;
----
logical_plan
01)Sort: simple_struct.id ASC NULLS LAST
-02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+02)--Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1) AS simple_struct.s[value] + Int64(1)
Review Comment:
I wonder if there is some way to avoid adding an alias like this when
project name doesn't change. It would make the plans simpler to read.
Perhaps we can address this as a follow on PR
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1295,6 +1297,18 @@ fn rewrite_projection(
predicates: Vec<Expr>,
mut projection: Projection,
) -> Result<(Transformed<LogicalPlan>, Option<Expr>)> {
+ // Note: This check coordinates with ExtractLeafExpressions optimizer rule.
+ // See extract_leaf_expressions.rs for details on why these projections
exist.
+ // Don't push filters through extracted expression projections.
+ // Pushing filters through would rewrite expressions like
`__datafusion_extracted_1 > 150`
+ // back to `get_field(s,'value') > 150`, undoing the extraction.
+ if is_extracted_expr_projection(&projection) {
Review Comment:
I think the comment would be better / easier to understand the need for the
special case if you included the great example from your comment
>
> ```
> Projection: col('col')
> Filter: __datafusion_extracted_1 > 1
> Projection: get_field(col, 'foo') as __datafusion_extracted_1, col
> TableScan: projection=[col]
> ```
>
> Then this rule runs and will produce:
>
> ```
> Projection: col('col')
> Projection: get_field(col, 'foo') as __datafusion_extracted_1, col
> Filter: get_field(col, 'foo') > 1
> TableScan projection=[col]
> ```
##########
datafusion/sqllogictest/test_files/explain.slt:
##########
Review Comment:
😓 that is a lot of rewrites (not related to this PR, I am just thinking
about planning speed in general)
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -241,13 +242,14 @@ query TT
EXPLAIN SELECT id, s['value'] + 1 FROM simple_struct WHERE id > 2;
----
logical_plan
-01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) +
Int64(1)
+01)Projection: simple_struct.id, __datafusion_extracted_1 + Int64(1) AS
simple_struct.s[value] + Int64(1)
Review Comment:
thank you for putting these tests in first -- it is much easier to see what
this PR is changing by looking at the different plans
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1916 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
+//!
+//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes`
computations
+//! (like field accessors) live in Projection nodes immediately above scan
nodes, making them
+//! eligible for pushdown by the `OptimizeProjections` rule.
+//!
+//! ## Algorithm
+//!
+//! This rule uses **TopDown** traversal with projection merging:
+//!
+//! 1. When encountering a projection with `MoveTowardsLeafNodes` expressions,
look at its input
+//! 2. If input is a Projection, **merge** the expressions through it using
column replacement
+//! 3. Continue until we hit a barrier node (TableScan, Join, Aggregate)
+//! 4. Idempotency is natural: merged expressions no longer have column refs
matching projection outputs
+//!
+//! ### Special Cases
+//!
+//! - If ALL expressions in a projection are `MoveTowardsLeafNodes`, push the
entire projection down
+//! - If NO expressions are `MoveTowardsLeafNodes`, return `Transformed::no`
+//!
+//! ### Node Classification
+//!
+//! **Barrier Nodes** (stop pushing, create projection above):
+//! - `TableScan` - the leaf, ideal extraction point
+//! - `Join` - requires routing to left/right sides
+//! - `Aggregate` - changes schema semantics
+//! - `SubqueryAlias` - scope boundary
+//! - `Union`, `Intersect`, `Except` - schema boundaries
+//!
+//! **Schema-Preserving Nodes** (push through unchanged):
+//! - `Filter` - passes all input columns through
+//! - `Sort` - passes all input columns through
+//! - `Limit` - passes all input columns through
+//!
+//! **Projection Nodes** (merge through):
+//! - Replace column refs with underlying expressions from the child projection
+
+use indexmap::{IndexMap, IndexSet};
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion_common::alias::AliasGenerator;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::expr_rewriter::NamePreserver;
+use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection,
Sort};
+
+use crate::optimizer::ApplyOrder;
+use crate::push_down_filter::replace_cols_by_name;
+use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs};
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into
projections.
+///
+/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations
(like field
+/// accessors) live in Projection nodes, making them eligible for pushdown.
Review Comment:
What does "live in projection nodes" mean here? Like that all
`MoveTowardsLeafNodes` computations appear as top level Exprs in a
ProjectionExec?
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -1339,7 +1359,247 @@ SELECT id, s['value'] FROM simple_struct ORDER BY id,
s['value'];
5 250
#####################
-# Section 12: Cleanup
+# Section 12: Join Tests - get_field Extraction from Join Nodes
+#####################
+
+# Create a second table for join tests
+statement ok
+COPY (
+ SELECT
+ column1 as id,
+ column2 as s
+ FROM VALUES
+ (1, {role: 'admin', level: 10}),
+ (2, {role: 'user', level: 5}),
+ (3, {role: 'guest', level: 1}),
+ (4, {role: 'admin', level: 8}),
+ (5, {role: 'user', level: 3})
+) TO 'test_files/scratch/projection_pushdown/join_right.parquet'
+STORED AS PARQUET;
+
+statement ok
+CREATE EXTERNAL TABLE join_right STORED AS PARQUET
+LOCATION 'test_files/scratch/projection_pushdown/join_right.parquet';
+
+###
+# Test 12.1: Join with get_field in equijoin condition
+# Tests extraction from join ON clause - get_field on each side routed
appropriately
+###
+
+query TT
+EXPLAIN SELECT simple_struct.id, join_right.id
+FROM simple_struct
+INNER JOIN join_right ON simple_struct.s['value'] = join_right.s['level'] * 10;
+----
+logical_plan
+01)Projection: simple_struct.id, join_right.id
+02)--Inner Join: __datafusion_extracted_1 = __datafusion_extracted_2 *
Int64(10)
+03)----Projection: get_field(simple_struct.s, Utf8("value")) AS
__datafusion_extracted_1, simple_struct.id
+04)------TableScan: simple_struct projection=[id, s]
+05)----Projection: get_field(join_right.s, Utf8("level")) AS
__datafusion_extracted_2, join_right.id
+06)------TableScan: join_right projection=[id, s]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(__datafusion_extracted_1@0, __datafusion_extracted_2 * Int64(10)@2)],
projection=[id@1, id@3]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet
Review Comment:
it is pretty cool to see the expression extraction pushed down on both sides
of the join
##########
datafusion/sqllogictest/test_files/projection_pushdown.slt:
##########
@@ -217,13 +217,14 @@ query TT
EXPLAIN SELECT id, s['value'] FROM simple_struct WHERE id > 2;
----
logical_plan
-01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value"))
+01)Projection: simple_struct.id, __datafusion_extracted_1 AS
simple_struct.s[value]
02)--Filter: simple_struct.id > Int64(2)
-03)----TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(2)]
+03)----Projection: get_field(simple_struct.s, Utf8("value")) AS
__datafusion_extracted_1, simple_struct.id
+04)------TableScan: simple_struct projection=[id, s],
partial_filters=[simple_struct.id > Int64(2)]
physical_plan
-01)ProjectionExec: expr=[id@0 as id, get_field(s@1, value) as
simple_struct.s[value]]
-02)--FilterExec: id@0 > 2
-03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[id, s], file_type=parquet, predicate=id@0 > 2,
pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2,
required_guarantees=[]
+01)ProjectionExec: expr=[id@1 as id, __datafusion_extracted_1@0 as
simple_struct.s[value]]
+02)--FilterExec: id@1 > 2
+03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_1, id],
file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 !=
row_count@2 AND id_max@0 > 2, required_guarantees=[]
Review Comment:
Here is the optimizer pass in action -- the get_field was pushed down -- the
plan looks good to me
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1916 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
+//!
+//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes`
computations
+//! (like field accessors) live in Projection nodes immediately above scan
nodes, making them
+//! eligible for pushdown by the `OptimizeProjections` rule.
+//!
+//! ## Algorithm
+//!
+//! This rule uses **TopDown** traversal with projection merging:
+//!
+//! 1. When encountering a projection with `MoveTowardsLeafNodes` expressions,
look at its input
+//! 2. If input is a Projection, **merge** the expressions through it using
column replacement
+//! 3. Continue until we hit a barrier node (TableScan, Join, Aggregate)
+//! 4. Idempotency is natural: merged expressions no longer have column refs
matching projection outputs
+//!
+//! ### Special Cases
+//!
+//! - If ALL expressions in a projection are `MoveTowardsLeafNodes`, push the
entire projection down
+//! - If NO expressions are `MoveTowardsLeafNodes`, return `Transformed::no`
+//!
+//! ### Node Classification
+//!
+//! **Barrier Nodes** (stop pushing, create projection above):
+//! - `TableScan` - the leaf, ideal extraction point
+//! - `Join` - requires routing to left/right sides
+//! - `Aggregate` - changes schema semantics
+//! - `SubqueryAlias` - scope boundary
+//! - `Union`, `Intersect`, `Except` - schema boundaries
+//!
+//! **Schema-Preserving Nodes** (push through unchanged):
+//! - `Filter` - passes all input columns through
+//! - `Sort` - passes all input columns through
+//! - `Limit` - passes all input columns through
+//!
+//! **Projection Nodes** (merge through):
+//! - Replace column refs with underlying expressions from the child projection
+
+use indexmap::{IndexMap, IndexSet};
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion_common::alias::AliasGenerator;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::expr_rewriter::NamePreserver;
+use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection,
Sort};
+
+use crate::optimizer::ApplyOrder;
+use crate::push_down_filter::replace_cols_by_name;
+use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs};
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into
projections.
+///
+/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations
(like field
+/// accessors) live in Projection nodes, making them eligible for pushdown.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+/// TableScan: t [user]
+/// ```
+///
+/// This rule extracts the field access into a projection:
+///
+/// ```text
+/// Filter: __datafusion_extracted_1 = 'active'
+/// Projection: user['status'] AS __datafusion_extracted_1, user
+/// TableScan: t [user]
+/// ```
+///
+/// The `OptimizeProjections` rule can then push this projection down to the
scan.
+///
+/// **Important:** The `PushDownFilter` rule is aware of projections created
by this rule
+/// and will not push filters through them. See `is_extracted_expr_projection`
in utils.rs.
Review Comment:
would be nice to make this a link too so it is checked automatically by
rustdoc rather than can get out of sync
```suggestion
/// and will not push filters through them. See
[`is_extracted_expr_projection`]
```
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1916 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
+//!
+//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes`
computations
+//! (like field accessors) live in Projection nodes immediately above scan
nodes, making them
+//! eligible for pushdown by the `OptimizeProjections` rule.
+//!
+//! ## Algorithm
+//!
+//! This rule uses **TopDown** traversal with projection merging:
+//!
+//! 1. When encountering a projection with `MoveTowardsLeafNodes` expressions,
look at its input
+//! 2. If input is a Projection, **merge** the expressions through it using
column replacement
+//! 3. Continue until we hit a barrier node (TableScan, Join, Aggregate)
+//! 4. Idempotency is natural: merged expressions no longer have column refs
matching projection outputs
+//!
+//! ### Special Cases
+//!
+//! - If ALL expressions in a projection are `MoveTowardsLeafNodes`, push the
entire projection down
+//! - If NO expressions are `MoveTowardsLeafNodes`, return `Transformed::no`
+//!
+//! ### Node Classification
+//!
+//! **Barrier Nodes** (stop pushing, create projection above):
+//! - `TableScan` - the leaf, ideal extraction point
+//! - `Join` - requires routing to left/right sides
+//! - `Aggregate` - changes schema semantics
+//! - `SubqueryAlias` - scope boundary
+//! - `Union`, `Intersect`, `Except` - schema boundaries
+//!
+//! **Schema-Preserving Nodes** (push through unchanged):
+//! - `Filter` - passes all input columns through
+//! - `Sort` - passes all input columns through
+//! - `Limit` - passes all input columns through
+//!
+//! **Projection Nodes** (merge through):
+//! - Replace column refs with underlying expressions from the child projection
Review Comment:
Is there a reason to split up the comments into module on the struct?
It might make sense to leave the module level comments relatively minimal
and move `#Algorithm` and everything else down to the doc comment on
`ExtractLeafExpressions` so the algorithm and examples are close together
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1916 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
Review Comment:
Thank you for the comments
I think it would help to make `MoveTowardsLeafNodes` a doc link -- aka
`ExpressionPlacement::MoveTowardsLeafNodes`
##########
datafusion/sqllogictest/test_files/push_down_filter.slt:
##########
@@ -116,11 +116,12 @@ explain select * from (select column1, unnest(column2) as
o from d) where o['a']
----
physical_plan
01)ProjectionExec: expr=[column1@0 as column1,
__unnest_placeholder(d.column2,depth=1)@1 as o]
-02)--FilterExec: get_field(__unnest_placeholder(d.column2,depth=1)@1, a) = 1
+02)--FilterExec: __datafusion_extracted_1@0 = 1, projection=[column1@1,
__unnest_placeholder(d.column2,depth=1)@2]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-04)------UnnestExec
-05)--------ProjectionExec: expr=[column1@0 as column1, column2@1 as
__unnest_placeholder(d.column2)]
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
+04)------ProjectionExec:
expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as
__datafusion_extracted_1, column1@0 as column1,
__unnest_placeholder(d.column2,depth=1)@1 as
__unnest_placeholder(d.column2,depth=1)]
+05)--------UnnestExec
Review Comment:
Since it wasn't pushed through before I think it is fine that it (still)
isn't pushed through
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1870 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
+//!
+//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes`
computations
+//! (like field accessors) live in Projection nodes immediately above scan
nodes, making them
+//! eligible for pushdown by the `OptimizeProjections` rule.
+//!
+//! ## Algorithm
+//!
+//! This rule uses **BottomUp** traversal to push ALL `MoveTowardsLeafNodes`
expressions
+//! (like `get_field`) to projections immediately above scan nodes. This
enables optimal
+//! Parquet column pruning.
+//!
+//! ### Node Classification
+//!
+//! **Barrier Nodes** (stop pushing, create projection above):
+//! - `TableScan` - the leaf, ideal extraction point
+//! - `Join` - requires routing to left/right sides
+//! - `Aggregate` - changes schema semantics
+//! - `SubqueryAlias` - scope boundary
+//! - `Union`, `Intersect`, `Except` - schema boundaries
+//!
+//! **Schema-Preserving Nodes** (push through):
+//! - `Filter` - passes all input columns through
+//! - `Sort` - passes all input columns through
+//! - `Limit` - passes all input columns through
+//! - Passthrough `Projection` - only column references
+//!
+//! ### How It Works
+//!
+//! 1. Process leaf nodes first (TableScan, etc.)
+//! 2. When processing higher nodes, descendants are already finalized
+//! 3. Push extractions DOWN through the plan, merging into existing extracted
+//! expression projections when possible
+
+use indexmap::{IndexMap, IndexSet};
+use std::sync::Arc;
+
+use datafusion_common::alias::AliasGenerator;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::expr_rewriter::NamePreserver;
+use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection,
Sort};
+
+use crate::optimizer::ApplyOrder;
+use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs,
is_extracted_expr_projection};
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into
projections.
+///
+/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations
(like field
+/// accessors) live in Projection nodes, making them eligible for pushdown.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+/// TableScan: t [user]
+/// ```
+///
+/// This rule extracts the field access into a projection:
+///
+/// ```text
+/// Filter: __datafusion_extracted_1 = 'active'
+/// Projection: user['status'] AS __datafusion_extracted_1, user
+/// TableScan: t [user]
+/// ```
+///
+/// The `OptimizeProjections` rule can then push this projection down to the
scan.
+///
+/// **Important:** The `PushDownFilter` rule is aware of projections created
by this rule
+/// and will not push filters through them. See `is_extracted_expr_projection`
in utils.rs.
+#[derive(Default, Debug)]
+pub struct ExtractLeafExpressions {}
+
+impl ExtractLeafExpressions {
+ /// Create a new [`ExtractLeafExpressions`]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl OptimizerRule for ExtractLeafExpressions {
+ fn name(&self) -> &str {
+ "extract_leaf_expressions"
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::BottomUp)
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
+ let alias_generator = config.alias_generator();
+ extract_from_plan(plan, alias_generator)
+ }
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
+///
+/// With BottomUp traversal, we process leaves first, then work up.
+/// This allows us to push extractions all the way down to scan nodes.
+fn extract_from_plan(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ match &plan {
+ // Schema-preserving nodes - extract and push down
+ LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_)
=> {
+ extract_from_schema_preserving(plan, alias_generator)
+ }
+
+ // Schema-transforming nodes need special handling
+ LogicalPlan::Aggregate(_) => extract_from_aggregate(plan,
alias_generator),
+ LogicalPlan::Projection(_) => extract_from_projection(plan,
alias_generator),
+ LogicalPlan::Join(_) => extract_from_join(plan, alias_generator),
+
+ // Everything else passes through unchanged
+ _ => Ok(Transformed::no(plan)),
Review Comment:
If we used the map_expressions API, as suggested above, we would get support
for Extension nodes "for free"
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1916 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
+//!
+//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes`
computations
+//! (like field accessors) live in Projection nodes immediately above scan
nodes, making them
+//! eligible for pushdown by the `OptimizeProjections` rule.
+//!
+//! ## Algorithm
+//!
+//! This rule uses **TopDown** traversal with projection merging:
+//!
+//! 1. When encountering a projection with `MoveTowardsLeafNodes` expressions,
look at its input
+//! 2. If input is a Projection, **merge** the expressions through it using
column replacement
+//! 3. Continue until we hit a barrier node (TableScan, Join, Aggregate)
+//! 4. Idempotency is natural: merged expressions no longer have column refs
matching projection outputs
+//!
+//! ### Special Cases
+//!
+//! - If ALL expressions in a projection are `MoveTowardsLeafNodes`, push the
entire projection down
+//! - If NO expressions are `MoveTowardsLeafNodes`, return `Transformed::no`
+//!
+//! ### Node Classification
+//!
+//! **Barrier Nodes** (stop pushing, create projection above):
+//! - `TableScan` - the leaf, ideal extraction point
+//! - `Join` - requires routing to left/right sides
+//! - `Aggregate` - changes schema semantics
+//! - `SubqueryAlias` - scope boundary
+//! - `Union`, `Intersect`, `Except` - schema boundaries
+//!
+//! **Schema-Preserving Nodes** (push through unchanged):
+//! - `Filter` - passes all input columns through
+//! - `Sort` - passes all input columns through
+//! - `Limit` - passes all input columns through
+//!
+//! **Projection Nodes** (merge through):
+//! - Replace column refs with underlying expressions from the child projection
+
+use indexmap::{IndexMap, IndexSet};
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion_common::alias::AliasGenerator;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::expr_rewriter::NamePreserver;
+use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection,
Sort};
+
+use crate::optimizer::ApplyOrder;
+use crate::push_down_filter::replace_cols_by_name;
+use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs};
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into
projections.
+///
+/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations
(like field
+/// accessors) live in Projection nodes, making them eligible for pushdown.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+/// TableScan: t [user]
+/// ```
+///
+/// This rule extracts the field access into a projection:
+///
+/// ```text
+/// Filter: __datafusion_extracted_1 = 'active'
+/// Projection: user['status'] AS __datafusion_extracted_1, user
+/// TableScan: t [user]
+/// ```
+///
+/// The `OptimizeProjections` rule can then push this projection down to the
scan.
+///
+/// **Important:** The `PushDownFilter` rule is aware of projections created
by this rule
+/// and will not push filters through them. See `is_extracted_expr_projection`
in utils.rs.
+#[derive(Default, Debug)]
+pub struct ExtractLeafExpressions {}
+
+impl ExtractLeafExpressions {
+ /// Create a new [`ExtractLeafExpressions`]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl OptimizerRule for ExtractLeafExpressions {
+ fn name(&self) -> &str {
+ "extract_leaf_expressions"
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::TopDown)
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
+ let alias_generator = config.alias_generator();
+ extract_from_plan(plan, alias_generator)
+ }
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
+///
+/// With TopDown traversal, we process parent nodes first, allowing us to
+/// merge expressions through child projections.
+fn extract_from_plan(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ match &plan {
+ // Schema-preserving nodes - extract and push down
+ LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_)
=> {
+ extract_from_schema_preserving(plan, alias_generator)
+ }
+
+ // Schema-transforming nodes need special handling
+ LogicalPlan::Aggregate(_) => extract_from_aggregate(plan,
alias_generator),
+ LogicalPlan::Projection(_) => extract_from_projection(plan,
alias_generator),
+ LogicalPlan::Join(_) => extract_from_join(plan, alias_generator),
+
+ // Everything else passes through unchanged
+ _ => Ok(Transformed::no(plan)),
+ }
+}
+
+/// Extracts from schema-preserving nodes (Filter, Sort, Limit).
+///
+/// These nodes don't change the schema, so we can extract expressions
+/// and push them down to existing extracted projections or create new ones.
+///
+/// Uses CSE's two-level pattern:
+/// 1. Inner extraction projection with ALL columns passed through
+/// 2. Outer recovery projection to restore original schema
+fn extract_from_schema_preserving(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ // Skip nodes with no children
+ if plan.inputs().is_empty() {
+ return Ok(Transformed::no(plan));
+ }
+
+ let input = plan.inputs()[0].clone();
+ let input_schema = Arc::clone(input.schema());
+
+ // Find where to place extractions (look down through schema-preserving
nodes)
+ let input_arc = Arc::new(input);
+ let (target, path) = find_extraction_target(&input_arc);
+ let target_schema = Arc::clone(target.schema());
+
+ // Extract using target schema - this is where the projection will be
placed
+ let mut extractor =
+ LeafExpressionExtractor::new(target_schema.as_ref(), alias_generator);
+
+ // Transform expressions
+ let transformed = plan.map_expressions(|expr| extractor.extract(expr))?;
+
+ if !extractor.has_extractions() {
+ return Ok(transformed);
+ }
+
+ let rebuilt_input = extractor.build_extraction_projection(&target, path)?;
+
+ // Create the node with new input
+ let new_inputs: Vec<LogicalPlan> = std::iter::once(rebuilt_input)
Review Comment:
the code above seems to assume there is a single input -- so it seems
strange to have code here for multiple inputs 🤔
##########
datafusion/optimizer/src/extract_leaf_expressions.rs:
##########
@@ -0,0 +1,1916 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ExtractLeafExpressions`] extracts `MoveTowardsLeafNodes` sub-expressions
into projections.
+//!
+//! This optimizer rule normalizes the plan so that all `MoveTowardsLeafNodes`
computations
+//! (like field accessors) live in Projection nodes immediately above scan
nodes, making them
+//! eligible for pushdown by the `OptimizeProjections` rule.
+//!
+//! ## Algorithm
+//!
+//! This rule uses **TopDown** traversal with projection merging:
+//!
+//! 1. When encountering a projection with `MoveTowardsLeafNodes` expressions,
look at its input
+//! 2. If input is a Projection, **merge** the expressions through it using
column replacement
+//! 3. Continue until we hit a barrier node (TableScan, Join, Aggregate)
+//! 4. Idempotency is natural: merged expressions no longer have column refs
matching projection outputs
+//!
+//! ### Special Cases
+//!
+//! - If ALL expressions in a projection are `MoveTowardsLeafNodes`, push the
entire projection down
+//! - If NO expressions are `MoveTowardsLeafNodes`, return `Transformed::no`
+//!
+//! ### Node Classification
+//!
+//! **Barrier Nodes** (stop pushing, create projection above):
+//! - `TableScan` - the leaf, ideal extraction point
+//! - `Join` - requires routing to left/right sides
+//! - `Aggregate` - changes schema semantics
+//! - `SubqueryAlias` - scope boundary
+//! - `Union`, `Intersect`, `Except` - schema boundaries
+//!
+//! **Schema-Preserving Nodes** (push through unchanged):
+//! - `Filter` - passes all input columns through
+//! - `Sort` - passes all input columns through
+//! - `Limit` - passes all input columns through
+//!
+//! **Projection Nodes** (merge through):
+//! - Replace column refs with underlying expressions from the child projection
+
+use indexmap::{IndexMap, IndexSet};
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use datafusion_common::alias::AliasGenerator;
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::expr_rewriter::NamePreserver;
+use datafusion_expr::logical_plan::LogicalPlan;
+use datafusion_expr::{Expr, ExpressionPlacement, Filter, Limit, Projection,
Sort};
+
+use crate::optimizer::ApplyOrder;
+use crate::push_down_filter::replace_cols_by_name;
+use crate::utils::{EXTRACTED_EXPR_PREFIX, has_all_column_refs};
+use crate::{OptimizerConfig, OptimizerRule};
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into
projections.
+///
+/// This normalizes the plan so that all `MoveTowardsLeafNodes` computations
(like field
+/// accessors) live in Projection nodes, making them eligible for pushdown.
+///
+/// # Example
+///
+/// Given a filter with a struct field access:
+///
+/// ```text
+/// Filter: user['status'] = 'active'
+/// TableScan: t [user]
+/// ```
+///
+/// This rule extracts the field access into a projection:
+///
+/// ```text
+/// Filter: __datafusion_extracted_1 = 'active'
+/// Projection: user['status'] AS __datafusion_extracted_1, user
+/// TableScan: t [user]
+/// ```
+///
+/// The `OptimizeProjections` rule can then push this projection down to the
scan.
+///
+/// **Important:** The `PushDownFilter` rule is aware of projections created
by this rule
+/// and will not push filters through them. See `is_extracted_expr_projection`
in utils.rs.
+#[derive(Default, Debug)]
+pub struct ExtractLeafExpressions {}
+
+impl ExtractLeafExpressions {
+ /// Create a new [`ExtractLeafExpressions`]
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl OptimizerRule for ExtractLeafExpressions {
+ fn name(&self) -> &str {
+ "extract_leaf_expressions"
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::TopDown)
+ }
+
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
+ let alias_generator = config.alias_generator();
+ extract_from_plan(plan, alias_generator)
+ }
+}
+
+/// Extracts `MoveTowardsLeafNodes` sub-expressions from a plan node.
+///
+/// With TopDown traversal, we process parent nodes first, allowing us to
+/// merge expressions through child projections.
+fn extract_from_plan(
+ plan: LogicalPlan,
+ alias_generator: &Arc<AliasGenerator>,
+) -> Result<Transformed<LogicalPlan>> {
+ match &plan {
+ // Schema-preserving nodes - extract and push down
+ LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_)
=> {
+ extract_from_schema_preserving(plan, alias_generator)
Review Comment:
I don't understand why there needs to be specialized code for different
LogicalPlan types -- this seems like it is exactly the use case
[`LogicalPlan::map_expressions()`](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html#method.map_expressions)
is designed to handle.
Couldn't you use map_expressions to rewrite any get_field expressions, and
then add the relevant projection below it?
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -1295,6 +1297,18 @@ fn rewrite_projection(
predicates: Vec<Expr>,
mut projection: Projection,
) -> Result<(Transformed<LogicalPlan>, Option<Expr>)> {
+ // Note: This check coordinates with ExtractLeafExpressions optimizer rule.
+ // See extract_leaf_expressions.rs for details on why these projections
exist.
+ // Don't push filters through extracted expression projections.
+ // Pushing filters through would rewrite expressions like
`__datafusion_extracted_1 > 150`
+ // back to `get_field(s,'value') > 150`, undoing the extraction.
+ if is_extracted_expr_projection(&projection) {
Review Comment:
> I'd argue as a general rule there's no point in pushing a filter under a
projection that is purely column selections / get_field expressions especially
if we can't then push it further.
Yes I agree with this statement.
I don't really have a better suggestion other than to perhaps make the
exception more general "don't push filters under projections that doesn't do
computation / etc"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]