This is an automated email from the ASF dual-hosted git repository.
adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 048374c91b add test for multi-column topk dynamic filter pushdown
(#17162)
048374c91b is described below
commit 048374c91b19e4a6923dc844881fda9dd21a12ae
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed Aug 13 21:09:46 2025 -0500
add test for multi-column topk dynamic filter pushdown (#17162)
---
.../physical_optimizer/filter_pushdown/mod.rs | 96 ++++++++++++++++++++++
1 file changed, 96 insertions(+)
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index 1639960fde..1a04753966 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -24,6 +24,7 @@ use arrow::{
};
use arrow_schema::SortOptions;
use datafusion::{
+ assert_batches_eq,
logical_expr::Operator,
physical_plan::{
expressions::{BinaryExpr, Column, Literal},
@@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() {
);
}
+#[tokio::test]
+async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
+ let batches = vec![
+ // We are going to do ORDER BY b ASC NULLS LAST, a DESC
+ // And we put the values in such a way that the first batch will fill
the TopK
+ // and we skip the second batch.
+ record_batch!(
+ ("a", Utf8, ["ac", "ad"]),
+ ("b", Utf8, ["bb", "ba"]),
+ ("c", Float64, [2.0, 1.0])
+ )
+ .unwrap(),
+ record_batch!(
+ ("a", Utf8, ["aa", "ab"]),
+ ("b", Utf8, ["bc", "bd"]),
+ ("c", Float64, [1.0, 2.0])
+ )
+ .unwrap(),
+ ];
+ let scan = TestScanBuilder::new(schema())
+ .with_support(true)
+ .with_batches(batches)
+ .build();
+ let plan = Arc::new(
+ SortExec::new(
+ LexOrdering::new(vec![
+ PhysicalSortExpr::new(
+ col("b", &schema()).unwrap(),
+ SortOptions::default().asc().nulls_last(),
+ ),
+ PhysicalSortExpr::new(
+ col("a", &schema()).unwrap(),
+ SortOptions::default().desc().nulls_first(),
+ ),
+ ])
+ .unwrap(),
+ Arc::clone(&scan),
+ )
+ .with_fetch(Some(2)),
+ ) as Arc<dyn ExecutionPlan>;
+
+ // expect the predicate to be pushed down into the DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC],
preserve_partitioning=[false]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true,
predicate=DynamicFilterPhysicalExpr [ true ]
+ "
+ );
+
+ // Actually apply the optimization to the plan and put some data through
it to check that the filter is updated to reflect the TopK state
+ let mut config = ConfigOptions::default();
+ config.execution.parquet.pushdown_filters = true;
+ let plan = FilterPushdown::new_post_optimization()
+ .optimize(plan, &config)
+ .unwrap();
+ let config = SessionConfig::new().with_batch_size(2);
+ let session_ctx = SessionContext::new_with_config(config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+ // Iterate one batch
+ let res = stream.next().await.unwrap().unwrap();
+ #[rustfmt::skip]
+ let expected = [
+ "+----+----+-----+",
+ "| a | b | c |",
+ "+----+----+-----+",
+ "| ad | ba | 1.0 |",
+ "| ac | bb | 2.0 |",
+ "+----+----+-----+",
+ ];
+ assert_batches_eq!(expected, &[res]);
+ // Now check what our filter looks like
+ insta::assert_snapshot!(
+ format!("{}", format_plan_for_test(&plan)),
+ @r"
+ - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC],
preserve_partitioning=[false], filter=[b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR
a@0 > ac)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true,
predicate=DynamicFilterPhysicalExpr [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR
a@0 > ac) ]
+ "
+ );
+ // There should be no more batches
+ assert!(stream.next().await.is_none());
+}
+
#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown() {
use datafusion_common::JoinType;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]