LiaCastaneda commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2804664690
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
+ )
+ .unwrap(),
+ ];
+ let build_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz"]),
+ ("b", Utf8, ["zz"]),
+ ("c", Float64, [2.0])
+ )
+ .unwrap(),
+ ];
+
+ // Probe side: each partition has a mix of matching and non-matching rows.
+ // Partition 0: ("aa","ba") matches p0 bounds, ("zz","zz") does not
+ // Partition 1: ("zz","zz") matches p1 bounds, ("aa","ba") does not
+ let probe_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "zz"]),
+ ("b", Utf8, ["ba", "zz"]),
+ ("e", Float64, [10.0, 20.0])
+ )
+ .unwrap(),
+ ];
+ let probe_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz", "aa"]),
+ ("b", Utf8, ["zz", "ba"]),
+ ("e", Float64, [30.0, 40.0])
+ )
+ .unwrap(),
+ ];
+
+ // Use 2 file groups per side (2 partitions) with no RepartitionExec.
+ // This triggers partition-index routing and simulates what happens when
+ // `preserve_file_partitions` is enabled and declares Hash partitioning on
the DataSourceExec.
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches_for_partition(build_p0)
+ .with_batches_for_partition(build_p1)
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "build_0.parquet",
+ 123,
+ )]))
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "build_1.parquet",
+ 123,
+ )]))
+ .build();
+
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+ .with_support(true)
+ .with_batches_for_partition(probe_p0)
+ .with_batches_for_partition(probe_p1)
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "probe_0.parquet",
+ 123,
+ )]))
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "probe_1.parquet",
+ 123,
+ )]))
+ .build();
+
+ let on = vec![
+ (
+ col("a", &build_side_schema).unwrap(),
+ col("a", &probe_side_schema).unwrap(),
+ ),
+ (
+ col("b", &build_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ),
+ ];
+ let hash_join = Arc::new(
+ HashJoinExecBuilder::new(
+ build_scan,
+ Arc::clone(&probe_scan),
+ on,
+ JoinType::Inner,
+ )
+ .with_partition_mode(PartitionMode::Partitioned)
+ .with_null_equality(datafusion_common::NullEquality::NullEqualsNothing)
+
.with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex)
+ .build()
+ .unwrap(),
+ );
+
+ // Top-level CoalescePartitionsExec
+ let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc<dyn
ExecutionPlan>;
+ // Add a sort for deterministic output
+ let plan = Arc::new(SortExec::new(
+ LexOrdering::new(vec![PhysicalSortExpr::new(
+ col("a", &probe_side_schema).unwrap(),
+ SortOptions::new(true, false), // descending, nulls_first
+ )])
+ .unwrap(),
+ cp,
+ )) as Arc<dyn ExecutionPlan>;
+
+ // expect the predicate to be pushed down into the probe side DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
Review Comment:
super nit: would it be enough to assert on the plan after execution only on
these kind of tests? this file is becoming increasingly large
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1454,21 +1481,86 @@ pub fn ensure_distribution(
plan.with_new_children(children_plans)?
};
+ // For partitioned hash joins, decide dynamic filter routing mode.
+ //
+ // Dynamic filtering requires matching partitioning schemes on both sides:
+ // - PartitionIndex: Both sides use file-grouped partitioning
(value-based).
+ // Partition i on build corresponds to partition i on probe by partition
value.
+ // - CaseHash: Both sides use hash repartitioning (hash-based).
+ // Uses CASE expression with hash(row) % N to route to correct partition
filter.
+ //
+ // NOTE: If partitioning schemes are misaligned (one file-grouped, one
hash-repartitioned),
+ // the partitioned join itself is incorrect.
+ // Partition assignments don't match:
+ // - File-grouped: partition 0 = all rows where column="A" (value-based)
+ // - Hash-repartitioned: partition 0 = all rows where hash(column) % N ==
0 (hash-based)
+ // These are incompatible, so the join will miss matching rows.
+ plan = if let Some(hash_join) =
plan.as_any().downcast_ref::<HashJoinExec>()
+ && matches!(hash_join.mode, PartitionMode::Partitioned)
+ {
+ let routing_mode = match (
+ children[0].data.repartitioned,
+ children[1].data.repartitioned,
+ ) {
+ (false, false) => DynamicFilterRoutingMode::PartitionIndex,
+ (true, true) => DynamicFilterRoutingMode::CaseHash,
+ _ => {
+ // Misaligned partitioning schemes
+ return plan_err!(
+ "Partitioned hash join has incompatible partitioning
schemes: \
+ left side is {}, right side is {}.",
+ if children[0].data.repartitioned {
+ "hash-repartitioned"
+ } else {
+ "file-grouped"
+ },
+ if children[1].data.repartitioned {
+ "hash-repartitioned"
+ } else {
+ "file-grouped"
+ }
Review Comment:
yeah I think its a good idea
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]