adriangb commented on code in PR #16686:
URL: https://github.com/apache/datafusion/pull/16686#discussion_r2187232804


##########
datafusion/datasource/src/source.rs:
##########
@@ -325,6 +328,9 @@ impl ExecutionPlan for DataSourceExec {
                 new_node.data_source = data_source;
                 new_node.cache =
                     
Self::compute_properties(Arc::clone(&new_node.data_source));
+                // Add the missing filters' equivalence info when filters 
pushdown is applied
+                let filter = conjunction(res.filters.collect_supported());
+                new_node = new_node.add_filter_equivalence_info(filter)?;

Review Comment:
   What happens if the node already had equivalence info / old filters?
   
   Practically speaking this shouldn't happen, but it is technically possible 
for a data source to have some filters passed in directly (e.g. in a 
TableProvider) and some filters passed in during pushdown.
   
   I think this is correct because it's adding the new filters and when we 
built the new node we did a conjunction.



##########
datafusion/datasource/src/source.rs:
##########
@@ -372,6 +378,20 @@ impl DataSourceExec {
         self
     }
 
+    /// Add filters' equivalence info
+    fn add_filter_equivalence_info(
+        mut self,
+        filter: Arc<dyn PhysicalExpr>,
+    ) -> Result<Self> {
+        let (equal_pairs, _) = collect_columns_from_predicate(&filter);
+        for (lhs, rhs) in equal_pairs {
+            self.cache
+                .eq_properties
+                .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
+        }
+        Ok(self)
+    }

Review Comment:
   This part I'm not familiar with and would appreciate if someone else took a 
look at and confirm is correct.



##########
datafusion/datasource/src/source.rs:
##########
@@ -325,6 +328,9 @@ impl ExecutionPlan for DataSourceExec {
                 new_node.data_source = data_source;
                 new_node.cache =
                     
Self::compute_properties(Arc::clone(&new_node.data_source));
+                // Add the missing filters' equivalence info when filters 
pushdown is applied

Review Comment:
   ```suggestion
                   // Recompute equivalence info using new filters
   ```



##########
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:
##########
@@ -239,6 +349,23 @@ physical_plan
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
 06)----------DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a, b], file_type=parquet, predicate=b@1 > 2, 
pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, 
required_guarantees=[]
 
+query T
+select a from t_pushdown where b = 2 ORDER BY b;
+----
+bar
+
+query TT
+EXPLAIN select a from t_pushdown where b = 2 ORDER BY b;
+----
+logical_plan
+01)Projection: t_pushdown.a
+02)--Sort: t_pushdown.b ASC NULLS LAST
+03)----Filter: t_pushdown.b = Int32(2)
+04)------TableScan: t_pushdown projection=[a, b], 
partial_filters=[t_pushdown.b = Int32(2)]
+physical_plan
+01)CoalescePartitionsExec
+02)--DataSourceExec: file_groups={2 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]},
 projection=[a], file_type=parquet, predicate=b@1 = 2, 
pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= 
b_max@1, required_guarantees=[b in (2)]
+

Review Comment:
   @liamzwbao could you help me understand what I should be looking for in this 
output / what proves that this fix works / what would have been different 
before this fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to