This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8596812e8f Add the missing equivalence info for filter pushdown 
(#16686)
8596812e8f is described below

commit 8596812e8f807d743643b5eecfb3a1137460d4eb
Author: Liam Bao <[email protected]>
AuthorDate: Tue Jul 8 17:45:56 2025 -0400

    Add the missing equivalence info for filter pushdown (#16686)
    
    * Reproduce the issue
    
    * Apply the fix
    
    * Side effect
    
    * Improve comments
---
 .../physical_optimizer/filter_pushdown/mod.rs      |   2 +-
 datafusion/datasource/src/source.rs                |  22 +++-
 datafusion/physical-plan/src/filter.rs             |   4 +-
 .../test_files/parquet_filter_pushdown.slt         | 127 +++++++++++++++++++++
 4 files changed, 152 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index f1ef365c92..68369bc9d9 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -289,7 +289,7 @@ fn test_no_pushdown_through_aggregates() {
         Ok:
           - FilterExec: b@1 = bar
           -   CoalesceBatchesExec: target_batch_size=100
-          -     AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
+          -     AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], 
aggr=[cnt], ordering_mode=PartiallySorted([0])
           -       CoalesceBatchesExec: target_batch_size=10
           -         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = 
foo
     "
diff --git a/datafusion/datasource/src/source.rs 
b/datafusion/datasource/src/source.rs
index 78d7e29560..756b2b4791 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -35,8 +35,11 @@ use crate::file_scan_config::FileScanConfig;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{Constraints, Result, Statistics};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-use datafusion_physical_expr::{EquivalenceProperties, Partitioning, 
PhysicalExpr};
+use datafusion_physical_expr::{
+    conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
+};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::filter::collect_columns_from_predicate;
 use datafusion_physical_plan::filter_pushdown::{
     ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, 
PredicateSupport,
 };
@@ -337,6 +340,9 @@ impl ExecutionPlan for DataSourceExec {
                 new_node.data_source = data_source;
                 new_node.cache =
                     
Self::compute_properties(Arc::clone(&new_node.data_source));
+                // Recompute equivalence info using new filters
+                let filter = conjunction(res.filters.collect_supported());
+                new_node = new_node.add_filter_equivalence_info(filter)?;
                 Ok(FilterPushdownPropagation {
                     filters: res.filters,
                     updated_node: Some(Arc::new(new_node)),
@@ -384,6 +390,20 @@ impl DataSourceExec {
         self
     }
 
+    /// Add filters' equivalence info
+    fn add_filter_equivalence_info(
+        mut self,
+        filter: Arc<dyn PhysicalExpr>,
+    ) -> Result<Self> {
+        let (equal_pairs, _) = collect_columns_from_predicate(&filter);
+        for (lhs, rhs) in equal_pairs {
+            self.cache
+                .eq_properties
+                .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
+        }
+        Ok(self)
+    }
+
     fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
         PlanProperties::new(
             data_source.eq_properties(),
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 54015c7bcd..66886c5766 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -726,7 +726,9 @@ impl RecordBatchStream for FilterExecStream {
 }
 
 /// Return the equals Column-Pairs and Non-equals Column-Pairs
-fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) -> 
EqualAndNonEqual {
+pub fn collect_columns_from_predicate(
+    predicate: &Arc<dyn PhysicalExpr>,
+) -> EqualAndNonEqual {
     let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
     let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
 
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt 
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index e94751548b..24e76a570c 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -113,6 +113,44 @@ physical_plan
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
 03)----DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
 
+query T
+select a from t where b = 2 ORDER BY b;
+----
+bar
+
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t.a
+02)--Sort: t.b ASC NULLS LAST
+03)----Filter: t.b = Int32(2)
+04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--ProjectionExec: expr=[a@0 as a]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------FilterExec: b@1 = 2
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
+06)----------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+
 # If we set the setting to `true` it override's the table's setting
 statement ok
 set datafusion.execution.parquet.pushdown_filters = true;
@@ -161,6 +199,40 @@ physical_plan
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
 03)----DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
 
+query T
+select a from t where b = 2 ORDER BY b;
+----
+bar
+
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t.a
+02)--Sort: t.b ASC NULLS LAST
+03)----Filter: t.b = Int32(2)
+04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+
 # If we reset the default the table created without pushdown goes back to 
disabling it
 statement ok
 set datafusion.execution.parquet.pushdown_filters = false;
@@ -212,6 +284,44 @@ physical_plan
 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
 03)----DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
 
+query T
+select a from t where b = 2 ORDER BY b;
+----
+bar
+
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t.a
+02)--Sort: t.b ASC NULLS LAST
+03)----Filter: t.b = Int32(2)
+04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--ProjectionExec: expr=[a@0 as a]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------FilterExec: b@1 = 2
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
+06)----------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+
 # When filter pushdown *is* enabled, ParquetExec can filter exactly,
 # not just metadata, so we expect to see no FilterExec
 query T
@@ -239,6 +349,23 @@ physical_plan
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
 06)----------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
 
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+
 # also test querying on columns that are not in all the files
 query T
 select a from t_pushdown where b > 2 AND a IS NOT NULL order by a;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to