alamb commented on code in PR #18404:
URL: https://github.com/apache/datafusion/pull/18404#discussion_r2487815292


##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -1892,3 +1891,445 @@ fn col_lit_predicate(
         Arc::new(Literal::new(scalar_value)),
     ))
 }
+
+#[tokio::test]
+async fn test_aggregate_filter_pushdown() {
+    // Test that filters can pass through AggregateExec even with aggregate 
functions
+    // when the filter references grouping columns
+    // Simulates: SELECT a, COUNT(b) FROM table WHERE a = 'x' GROUP BY a
+
+    let batches =
+        vec![
+            record_batch!(("a", Utf8, ["x", "y"]), ("b", Utf8, ["foo", 
"bar"])).unwrap(),
+        ];
+
+    let scan = TestScanBuilder::new(schema())
+        .with_support(true)
+        .with_batches(batches)
+        .build();
+
+    // Create an aggregate: GROUP BY a with COUNT(b)
+    let group_by = PhysicalGroupBy::new_single(vec![(
+        col("a", &schema()).unwrap(),
+        "a".to_string(),
+    )]);
+
+    // Add COUNT aggregate
+    let count_expr =
+        AggregateExprBuilder::new(count_udaf(), vec![col("b", 
&schema()).unwrap()])
+            .schema(schema())
+            .alias("count")
+            .build()
+            .unwrap();
+
+    let aggregate = Arc::new(
+        AggregateExec::try_new(
+            AggregateMode::Partial,
+            group_by,
+            vec![count_expr.into()], // Has aggregate function
+            vec![None],              // No filter on the aggregate function
+            Arc::clone(&scan),
+            schema(),
+        )
+        .unwrap(),
+    );
+
+    // Add a filter on the grouping column 'a'
+    let predicate = col_lit_predicate("a", "x", &schema());
+    let plan = Arc::new(FilterExec::try_new(predicate, aggregate).unwrap())
+        as Arc<dyn ExecutionPlan>;
+
+    // Even with aggregate functions, filter on grouping column should be 
pushed through
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = x
+        -   AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count]
+        -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count], 
ordering_mode=Sorted
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = x
+    "
+    );
+}
+
+#[tokio::test]
+async fn test_no_pushdown_filter_on_aggregate_result() {
+    // Test that filters on aggregate results (not grouping columns) are NOT 
pushed through

Review Comment:
   ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to