adriangb commented on code in PR #16686: URL: https://github.com/apache/datafusion/pull/16686#discussion_r2187232804
########## datafusion/datasource/src/source.rs: ########## @@ -325,6 +328,9 @@ impl ExecutionPlan for DataSourceExec { new_node.data_source = data_source; new_node.cache = Self::compute_properties(Arc::clone(&new_node.data_source)); + // Add the missing filters' equivalence info when filters pushdown is applied + let filter = conjunction(res.filters.collect_supported()); + new_node = new_node.add_filter_equivalence_info(filter)?; Review Comment: What happens if the node already had equivalence info / old filters? Practically speaking this shouldn't happen, but it is technically possible for a data source to have some filters passed in directly (e.g. in a TableProvider) and some filters passed in during pushdown. I think this is correct because it's adding the new filters and when we built the new node we did a conjunction. ########## datafusion/datasource/src/source.rs: ########## @@ -372,6 +378,20 @@ impl DataSourceExec { self } + /// Add filters' equivalence info + fn add_filter_equivalence_info( + mut self, + filter: Arc<dyn PhysicalExpr>, + ) -> Result<Self> { + let (equal_pairs, _) = collect_columns_from_predicate(&filter); + for (lhs, rhs) in equal_pairs { + self.cache + .eq_properties + .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? + } + Ok(self) + } Review Comment: This part I'm not familiar with and would appreciate if someone else took a look at and confirm is correct. ########## datafusion/datasource/src/source.rs: ########## @@ -325,6 +328,9 @@ impl ExecutionPlan for DataSourceExec { new_node.data_source = data_source; new_node.cache = Self::compute_properties(Arc::clone(&new_node.data_source)); + // Add the missing filters' equivalence info when filters pushdown is applied Review Comment: ```suggestion // Recompute equivalence info using new filters ``` ########## datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt: ########## @@ -239,6 +349,23 @@ physical_plan 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[] +query T +select a from t_pushdown where b = 2 ORDER BY b; +---- +bar + +query TT +EXPLAIN select a from t_pushdown where b = 2 ORDER BY b; +---- +logical_plan +01)Projection: t_pushdown.a +02)--Sort: t_pushdown.b ASC NULLS LAST +03)----Filter: t_pushdown.b = Int32(2) +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] +physical_plan +01)CoalescePartitionsExec +02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)] + Review Comment: @liamzwbao could you help me understand what I should be looking for in this output / what proves that this fix works / what would have been different before this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org