gene-bordegaray commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2804905205
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
+ )
+ .unwrap(),
+ ];
+ let build_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz"]),
+ ("b", Utf8, ["zz"]),
+ ("c", Float64, [2.0])
+ )
+ .unwrap(),
+ ];
+
+ // Probe side: each partition has a mix of matching and non-matching rows.
+ // Partition 0: ("aa","ba") matches p0 bounds, ("zz","zz") does not
+ // Partition 1: ("zz","zz") matches p1 bounds, ("aa","ba") does not
+ let probe_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "zz"]),
+ ("b", Utf8, ["ba", "zz"]),
+ ("e", Float64, [10.0, 20.0])
+ )
+ .unwrap(),
+ ];
+ let probe_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz", "aa"]),
+ ("b", Utf8, ["zz", "ba"]),
+ ("e", Float64, [30.0, 40.0])
+ )
+ .unwrap(),
+ ];
+
+ // Use 2 file groups per side (2 partitions) with no RepartitionExec.
+ // This triggers partition-index routing and simulates what happens when
+ // `preserve_file_partitions` is enabled and declares Hash partitioning on
the DataSourceExec.
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches_for_partition(build_p0)
+ .with_batches_for_partition(build_p1)
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "build_0.parquet",
+ 123,
+ )]))
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "build_1.parquet",
+ 123,
+ )]))
+ .build();
+
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+ .with_support(true)
+ .with_batches_for_partition(probe_p0)
+ .with_batches_for_partition(probe_p1)
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "probe_0.parquet",
+ 123,
+ )]))
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "probe_1.parquet",
+ 123,
+ )]))
+ .build();
+
+ let on = vec![
+ (
+ col("a", &build_side_schema).unwrap(),
+ col("a", &probe_side_schema).unwrap(),
+ ),
+ (
+ col("b", &build_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ),
+ ];
+ let hash_join = Arc::new(
+ HashJoinExecBuilder::new(
+ build_scan,
+ Arc::clone(&probe_scan),
+ on,
+ JoinType::Inner,
+ )
+ .with_partition_mode(PartitionMode::Partitioned)
+ .with_null_equality(datafusion_common::NullEquality::NullEqualsNothing)
+
.with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex)
+ .build()
+ .unwrap(),
+ );
+
+ // Top-level CoalescePartitionsExec
+ let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc<dyn
ExecutionPlan>;
+ // Add a sort for deterministic output
+ let plan = Arc::new(SortExec::new(
+ LexOrdering::new(vec![PhysicalSortExpr::new(
+ col("a", &probe_side_schema).unwrap(),
+ SortOptions::new(true, false), // descending, nulls_first
+ )])
+ .unwrap(),
+ cp,
+ )) as Arc<dyn ExecutionPlan>;
+
+ // expect the predicate to be pushed down into the probe side DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
Review Comment:
ya I thought similarly. I wouldn't be opposed. I will let @adriangb or
@NGA-TRAN weigh in their thoughs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]