This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8596812e8f Add the missing equivalence info for filter pushdown
(#16686)
8596812e8f is described below
commit 8596812e8f807d743643b5eecfb3a1137460d4eb
Author: Liam Bao <[email protected]>
AuthorDate: Tue Jul 8 17:45:56 2025 -0400
Add the missing equivalence info for filter pushdown (#16686)
* Reproduce the issue
* Apply the fix
* Side effect
* Improve comments
---
.../physical_optimizer/filter_pushdown/mod.rs | 2 +-
datafusion/datasource/src/source.rs | 22 +++-
datafusion/physical-plan/src/filter.rs | 4 +-
.../test_files/parquet_filter_pushdown.slt | 127 +++++++++++++++++++++
4 files changed, 152 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index f1ef365c92..68369bc9d9 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -289,7 +289,7 @@ fn test_no_pushdown_through_aggregates() {
Ok:
- FilterExec: b@1 = bar
- CoalesceBatchesExec: target_batch_size=100
- - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt]
+ - AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b],
aggr=[cnt], ordering_mode=PartiallySorted([0])
- CoalesceBatchesExec: target_batch_size=10
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 =
foo
"
diff --git a/datafusion/datasource/src/source.rs
b/datafusion/datasource/src/source.rs
index 78d7e29560..756b2b4791 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -35,8 +35,11 @@ use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-use datafusion_physical_expr::{EquivalenceProperties, Partitioning,
PhysicalExpr};
+use datafusion_physical_expr::{
+ conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
+};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::filter::collect_columns_from_predicate;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
PredicateSupport,
};
@@ -337,6 +340,9 @@ impl ExecutionPlan for DataSourceExec {
new_node.data_source = data_source;
new_node.cache =
Self::compute_properties(Arc::clone(&new_node.data_source));
+ // Recompute equivalence info using new filters
+ let filter = conjunction(res.filters.collect_supported());
+ new_node = new_node.add_filter_equivalence_info(filter)?;
Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: Some(Arc::new(new_node)),
@@ -384,6 +390,20 @@ impl DataSourceExec {
self
}
+ /// Add filters' equivalence info
+ fn add_filter_equivalence_info(
+ mut self,
+ filter: Arc<dyn PhysicalExpr>,
+ ) -> Result<Self> {
+ let (equal_pairs, _) = collect_columns_from_predicate(&filter);
+ for (lhs, rhs) in equal_pairs {
+ self.cache
+ .eq_properties
+ .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
+ }
+ Ok(self)
+ }
+
fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
PlanProperties::new(
data_source.eq_properties(),
diff --git a/datafusion/physical-plan/src/filter.rs
b/datafusion/physical-plan/src/filter.rs
index 54015c7bcd..66886c5766 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -726,7 +726,9 @@ impl RecordBatchStream for FilterExecStream {
}
/// Return the equals Column-Pairs and Non-equals Column-Pairs
-fn collect_columns_from_predicate(predicate: &Arc<dyn PhysicalExpr>) ->
EqualAndNonEqual {
+pub fn collect_columns_from_predicate(
+ predicate: &Arc<dyn PhysicalExpr>,
+) -> EqualAndNonEqual {
let mut eq_predicate_columns = Vec::<PhysicalExprPairRef>::new();
let mut ne_predicate_columns = Vec::<PhysicalExprPairRef>::new();
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index e94751548b..24e76a570c 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -113,6 +113,44 @@ physical_plan
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 > 2,
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2,
required_guarantees=[]
+query T
+select a from t where b = 2 ORDER BY b;
+----
+bar
+
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t.a
+02)--Sort: t.b ASC NULLS LAST
+03)----Filter: t.b = Int32(2)
+04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--ProjectionExec: expr=[a@0 as a]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------FilterExec: b@1 = 2
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
+06)----------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], file_type=parquet, predicate=b@1 = 2,
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <=
b_max@1, required_guarantees=[b in (2)]
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 = 2,
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <=
b_max@1, required_guarantees=[b in (2)]
+
# If we set the setting to `true` it override's the table's setting
statement ok
set datafusion.execution.parquet.pushdown_filters = true;
@@ -161,6 +199,40 @@ physical_plan
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 > 2,
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2,
required_guarantees=[]
+query T
+select a from t where b = 2 ORDER BY b;
+----
+bar
+
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t.a
+02)--Sort: t.b ASC NULLS LAST
+03)----Filter: t.b = Int32(2)
+04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 = 2,
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <=
b_max@1, required_guarantees=[b in (2)]
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 = 2,
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <=
b_max@1, required_guarantees=[b in (2)]
+
# If we reset the default the table created without pushdown goes back to
disabling it
statement ok
set datafusion.execution.parquet.pushdown_filters = false;
@@ -212,6 +284,44 @@ physical_plan
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 > 2,
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2,
required_guarantees=[]
+query T
+select a from t where b = 2 ORDER BY b;
+----
+bar
+
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t.a
+02)--Sort: t.b ASC NULLS LAST
+03)----Filter: t.b = Int32(2)
+04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--ProjectionExec: expr=[a@0 as a]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------FilterExec: b@1 = 2
+05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
+06)----------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], file_type=parquet, predicate=b@1 = 2,
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <=
b_max@1, required_guarantees=[b in (2)]
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 = 2,
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <=
b_max@1, required_guarantees=[b in (2)]
+
# When filter pushdown *is* enabled, ParquetExec can filter exactly,
# not just metadata, so we expect to see no FilterExec
query T
@@ -239,6 +349,23 @@ physical_plan
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
06)----------DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a, b], file_type=parquet, predicate=b@1 > 2,
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2,
required_guarantees=[]
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b],
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
projection=[a], file_type=parquet, predicate=b@1 = 2,
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <=
b_max@1, required_guarantees=[b in (2)]
+
# also test querying on columns that are not in all the files
query T
select a from t_pushdown where b > 2 AND a IS NOT NULL order by a;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]