This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 048374c91b add test for multi-column topk dynamic filter pushdown 
(#17162)
048374c91b is described below

commit 048374c91b19e4a6923dc844881fda9dd21a12ae
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed Aug 13 21:09:46 2025 -0500

    add test for multi-column topk dynamic filter pushdown (#17162)
---
 .../physical_optimizer/filter_pushdown/mod.rs      | 96 ++++++++++++++++++++++
 1 file changed, 96 insertions(+)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index 1639960fde..1a04753966 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -24,6 +24,7 @@ use arrow::{
 };
 use arrow_schema::SortOptions;
 use datafusion::{
+    assert_batches_eq,
     logical_expr::Operator,
     physical_plan::{
         expressions::{BinaryExpr, Column, Literal},
@@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() {
     );
 }
 
+#[tokio::test]
+async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
+    let batches = vec![
+        // We are going to do ORDER BY b ASC NULLS LAST, a DESC
+        // And we put the values in such a way that the first batch will fill 
the TopK
+        // and we skip the second batch.
+        record_batch!(
+            ("a", Utf8, ["ac", "ad"]),
+            ("b", Utf8, ["bb", "ba"]),
+            ("c", Float64, [2.0, 1.0])
+        )
+        .unwrap(),
+        record_batch!(
+            ("a", Utf8, ["aa", "ab"]),
+            ("b", Utf8, ["bc", "bd"]),
+            ("c", Float64, [1.0, 2.0])
+        )
+        .unwrap(),
+    ];
+    let scan = TestScanBuilder::new(schema())
+        .with_support(true)
+        .with_batches(batches)
+        .build();
+    let plan = Arc::new(
+        SortExec::new(
+            LexOrdering::new(vec![
+                PhysicalSortExpr::new(
+                    col("b", &schema()).unwrap(),
+                    SortOptions::default().asc().nulls_last(),
+                ),
+                PhysicalSortExpr::new(
+                    col("a", &schema()).unwrap(),
+                    SortOptions::default().desc().nulls_first(),
+                ),
+            ])
+            .unwrap(),
+            Arc::clone(&scan),
+        )
+        .with_fetch(Some(2)),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], 
preserve_partitioning=[false]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], 
preserve_partitioning=[false]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ]
+    "
+    );
+
+    // Actually apply the optimization to the plan and put some data through 
it to check that the filter is updated to reflect the TopK state
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(plan, &config)
+        .unwrap();
+    let config = SessionConfig::new().with_batch_size(2);
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+    // Iterate one batch
+    let res = stream.next().await.unwrap().unwrap();
+    #[rustfmt::skip]
+    let expected = [
+        "+----+----+-----+",
+        "| a  | b  | c   |",
+        "+----+----+-----+",
+        "| ad | ba | 1.0 |",
+        "| ac | bb | 2.0 |",
+        "+----+----+-----+",
+    ];
+    assert_batches_eq!(expected, &[res]);
+    // Now check what our filter looks like
+    insta::assert_snapshot!(
+        format!("{}", format_plan_for_test(&plan)),
+        @r"
+    - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], 
preserve_partitioning=[false], filter=[b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR 
a@0 > ac)]
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR 
a@0 > ac) ]
+    "
+    );
+    // There should be no more batches
+    assert!(stream.next().await.is_none());
+}
+
 #[tokio::test]
 async fn test_hashjoin_dynamic_filter_pushdown() {
     use datafusion_common::JoinType;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to