xudong963 commented on code in PR #18046:
URL: https://github.com/apache/datafusion/pull/18046#discussion_r2428037975
##########
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs:
##########
@@ -834,6 +840,132 @@ async fn
test_topk_dynamic_filter_pushdown_multi_column_sort() {
assert!(stream.next().await.is_none());
}
+#[tokio::test]
+async fn test_topk_filter_passes_through_coalesce_partitions() {
+ // Create multiple batches for different partitions
+ let batches = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "ab"]),
+ ("b", Utf8, ["bd", "bc"]),
+ ("c", Float64, [1.0, 2.0])
+ )
+ .unwrap(),
+ record_batch!(
+ ("a", Utf8, ["ac", "ad"]),
+ ("b", Utf8, ["bb", "ba"]),
+ ("c", Float64, [2.0, 1.0])
+ )
+ .unwrap(),
+ ];
+
+ // Create a source that supports all batches
+ let source = Arc::new(TestSource::new(true, batches));
+
+ let base_config = FileScanConfigBuilder::new(
+ ObjectStoreUrl::parse("test://").unwrap(),
+ Arc::clone(&schema()),
+ source,
+ )
+ .with_file_groups(vec![
+ // Partition 0
+ FileGroup::new(vec![PartitionedFile::new("test1.parquet", 123)]),
+ // Partition 1
+ FileGroup::new(vec![PartitionedFile::new("test2.parquet", 123)]),
+ ])
+ .build();
+
+ let scan = DataSourceExec::from_data_source(base_config);
+
+ // Add CoalescePartitionsExec to merge the two partitions
+ let coalesce = Arc::new(CoalescePartitionsExec::new(scan)) as Arc<dyn
ExecutionPlan>;
+
+ // Add SortExec with TopK
+ let plan = Arc::new(
+ SortExec::new(
+ LexOrdering::new(vec![PhysicalSortExpr::new(
+ col("b", &schema()).unwrap(),
+ SortOptions::new(true, false),
+ )])
+ .unwrap(),
+ coalesce,
+ )
+ .with_fetch(Some(1)),
+ ) as Arc<dyn ExecutionPlan>;
+
+ // Test optimization - the filter SHOULD pass through
CoalescePartitionsExec
+ // if it properly implements from_children (not all_unsupported)
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - DataSourceExec: file_groups={2 groups: [[test1.parquet],
[test2.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - DataSourceExec: file_groups={2 groups: [[test1.parquet],
[test2.parquet]]}, projection=[a, b, c], file_type=test,
pushdown_supported=true, predicate=DynamicFilter [ empty ]
+ "
+ );
+}
+
+#[tokio::test]
+async fn test_topk_filter_passes_through_coalesce_batches() {
+ let batches = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "ab"]),
+ ("b", Utf8, ["bd", "bc"]),
+ ("c", Float64, [1.0, 2.0])
+ )
+ .unwrap(),
+ record_batch!(
+ ("a", Utf8, ["ac", "ad"]),
+ ("b", Utf8, ["bb", "ba"]),
+ ("c", Float64, [2.0, 1.0])
+ )
+ .unwrap(),
+ ];
+
+ let scan = TestScanBuilder::new(schema())
+ .with_support(true)
+ .with_batches(batches)
+ .build();
+
+ let coalesce_batches =
+ Arc::new(CoalesceBatchesExec::new(scan, 1024)) as Arc<dyn
ExecutionPlan>;
+
+ // Add SortExec with TopK
+ let plan = Arc::new(
+ SortExec::new(
+ LexOrdering::new(vec![PhysicalSortExpr::new(
+ col("b", &schema()).unwrap(),
+ SortOptions::new(true, false),
+ )])
+ .unwrap(),
+ coalesce_batches,
+ )
+ .with_fetch(Some(1)),
+ ) as Arc<dyn ExecutionPlan>;
+
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST],
preserve_partitioning=[false]
+ - CoalesceBatchesExec: target_batch_size=1024
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
Review Comment:
I added a test for `CoalesceBatchesExec` meanwhile
The key change: `predicate=DynamicFilter [ empty ]`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]