Tpt commented on code in PR #18404:
URL: https://github.com/apache/datafusion/pull/18404#discussion_r2487294667


##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -1025,6 +1031,86 @@ impl ExecutionPlan for AggregateExec {
     fn cardinality_effect(&self) -> CardinalityEffect {
         CardinalityEffect::LowerEqual
     }
+
+    fn gather_filters_for_pushdown(
+        &self,
+        _phase: FilterPushdownPhase,
+        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+        _config: &ConfigOptions,
+    ) -> Result<FilterDescription> {
+        // It's safe to push down filters through aggregates when filters only 
reference
+        // grouping columns, because such filters determine which groups to 
compute, not
+        // *how* to compute them. Each group's aggregate values (SUM, COUNT, 
etc.) are
+        // calculated from the same input rows regardless of whether we filter 
before or
+        // after grouping - filtering before just eliminates entire groups 
early.
+        // This optimization is NOT safe for filters on aggregated columns 
(like filtering on
+        // the result of SUM or COUNT), as those require computing all groups 
first.
+
+        let grouping_columns: HashSet<_> = self
+            .group_by
+            .expr()
+            .iter()
+            .flat_map(|(expr, _)| collect_columns(expr))
+            .collect();
+
+        // Analyze each filter separately to determine if it can be pushed down
+        let mut safe_filters = Vec::new();
+        let mut unsafe_filters = Vec::new();
+
+        for filter in parent_filters {
+            let filter_columns: HashSet<_> =
+                collect_columns(&filter).into_iter().collect();
+
+            // Check if this filter references non-grouping columns
+            let references_non_grouping = !grouping_columns.is_empty()
+                && !filter_columns.is_subset(&grouping_columns);
+
+            if references_non_grouping {
+                unsafe_filters.push(filter);
+                continue;
+            }
+
+            // For GROUPING SETS, verify this filter's columns appear in all 
grouping sets
+            if self.group_by.groups().len() > 1 {
+                let filter_column_indices: HashSet<usize> = filter_columns

Review Comment:
   I guess deduplication is not needed here:
   ```suggestion
                   let filter_column_indices: Vec<usize> = filter_columns
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to