NGA-TRAN commented on code in PR #20246:
URL: https://github.com/apache/datafusion/pull/20246#discussion_r2784568106
##########
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt:
##########
@@ -101,6 +101,29 @@ STORED AS PARQUET;
----
4
+# Create hive-partitioned dimension table (3 partitions matching fact_table)
+# For testing Partitioned joins with matching partition counts
+query I
+COPY (SELECT 'dev' as env, 'log' as service)
+TO
'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT 'prod' as env, 'log' as service)
+TO
'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet'
+STORED AS PARQUET;
+----
+1
+
+query I
+COPY (SELECT 'prod' as env, 'log' as service)
Review Comment:
Isn't this exactly the same as the `d_dkey=B` above? Do you mean to modify
the value?
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -5698,4 +5730,256 @@ mod tests {
.contains("null_aware anti join only supports single column
join key")
);
}
+
+ #[test]
+ fn test_has_hash_repartition() {
+ let left = build_table(
+ ("a1", &vec![1, 2]),
+ ("b1", &vec![4, 5]),
+ ("c1", &vec![7, 8]),
+ );
+
+ // DataSource -> RepartitionExec(Hash) should return true
+ let schema = left.schema();
+ let hash_exprs =
+ vec![Arc::new(Column::new_with_schema("b1", &schema).unwrap()) as
_];
+ let hash_repartition = Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ );
+ assert!(
+ HashJoinExec::has_hash_repartition(
+ &(hash_repartition as Arc<dyn ExecutionPlan>)
+ ),
+ "should detect Hash RepartitionExec"
+ );
+
+ // DataSource -> RepartitionExec(RoundRobin) should return false
+ let round_robin_repartition = Arc::new(
+ RepartitionExec::try_new(Arc::clone(&left),
Partitioning::RoundRobinBatch(4))
+ .unwrap(),
+ );
+ assert!(
+ !HashJoinExec::has_hash_repartition(
+ &(round_robin_repartition as Arc<dyn ExecutionPlan>)
+ ),
+ "RoundRobin should not be detected as hash repartition"
+ );
+
+ // DataSource -> RepartitionExec(UnknownPartitioning) should return
false
+ let unknown_repartition = Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::UnknownPartitioning(4),
+ )
+ .unwrap(),
+ );
+ assert!(
+ !HashJoinExec::has_hash_repartition(
+ &(unknown_repartition as Arc<dyn ExecutionPlan>)
+ ),
+ "UnknownPartitioning should not be detected as hash repartition"
+ );
+
+ // Plain DataSource (no RepartitionExec) should return false
+ assert!(
+ !HashJoinExec::has_hash_repartition(&left),
+ "plan without RepartitionExec should return false"
+ );
+
+ // CoalescePartitionsExec -> RepartitionExec(Hash) should return true
+ let coalesce = Arc::new(CoalescePartitionsExec::new(Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ ))) as Arc<dyn ExecutionPlan>;
+ assert!(
+ HashJoinExec::has_hash_repartition(&coalesce),
+ "should traverse through CoalescePartitionsExec to find Hash
RepartitionExec"
+ );
+
+ // CoalescePartitionsExec -> DataSource (no RepartitionExec) should
return false
+ let coalesce_no_repart =
Arc::new(CoalescePartitionsExec::new(Arc::clone(&left)))
+ as Arc<dyn ExecutionPlan>;
+ assert!(
+ !HashJoinExec::has_hash_repartition(&coalesce_no_repart),
+ "should return false when traversing through
CoalescePartitionsExec with no RepartitionExec"
+ );
+
+ // Multiple single-child operators -> RepartitionExec(Hash) should
return true
+ let deep_chain = Arc::new(CoalescePartitionsExec::new(Arc::new(
+ CoalescePartitionsExec::new(Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ )),
+ ))) as Arc<dyn ExecutionPlan>;
+ assert!(
+ HashJoinExec::has_hash_repartition(&deep_chain),
+ "should traverse through multiple single-child operators to find
Hash RepartitionExec"
+ );
+
+ // UnionExec with no RepartitionExec on either branch should return
false
+ let union_no_repart =
+ crate::union::UnionExec::try_new(vec![Arc::clone(&left),
Arc::clone(&left)])
+ .unwrap();
+ assert!(
+ !HashJoinExec::has_hash_repartition(&union_no_repart),
+ "UnionExec with no RepartitionExec on any branch should return
false"
+ );
+
+ // UnionExec with RepartitionExec(Hash) on one branch should return
true
+ let hash_branch = Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+ let union_one_hash =
+ crate::union::UnionExec::try_new(vec![Arc::clone(&left),
hash_branch])
+ .unwrap();
+ assert!(
+ HashJoinExec::has_hash_repartition(&union_one_hash),
+ "UnionExec with RepartitionExec(Hash) on one branch should return
true"
+ );
+
+ // CoalescePartitionsExec -> UnionExec -> ... -> RepartitionExec(Hash)
should return true
+ let deep_union = Arc::new(CoalescePartitionsExec::new(
+ crate::union::UnionExec::try_new(vec![
+ Arc::clone(&left),
+ Arc::new(CoalescePartitionsExec::new(Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs, 4),
+ )
+ .unwrap(),
+ ))) as Arc<dyn ExecutionPlan>,
+ ])
+ .unwrap(),
+ )) as Arc<dyn ExecutionPlan>;
+ assert!(
+ HashJoinExec::has_hash_repartition(&deep_union),
+ "should find RepartitionExec(Hash) through UnionExec and multiple
nesting levels"
+ );
+ }
+
+ #[test]
+ fn test_should_use_partition_index() -> Result<()> {
+ let left = build_table(
+ ("a1", &vec![1, 2]),
+ ("b1", &vec![4, 5]),
+ ("c1", &vec![7, 8]),
+ );
+ let right = build_table(
+ ("a2", &vec![10, 20]),
+ ("b1", &vec![4, 5]),
+ ("c2", &vec![70, 80]),
+ );
+ let left_schema = left.schema();
+ let right_schema = right.schema();
+
+ let make_on = |ls: &Schema,
+ rs: &Schema|
+ -> Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> {
+ vec![(
+ Arc::new(Column::new_with_schema("b1", ls).unwrap()) as _,
+ Arc::new(Column::new_with_schema("b1", rs).unwrap()) as _,
+ )]
+ };
+
+ let make_hash_repart = |plan: Arc<dyn ExecutionPlan>,
+ schema: &Schema|
+ -> Arc<dyn ExecutionPlan> {
+ Arc::new(
+ RepartitionExec::try_new(
+ plan,
+ Partitioning::Hash(
+ vec![
+ Arc::new(Column::new_with_schema("b1",
schema).unwrap()) as _
+ ],
+ 4,
+ ),
+ )
+ .unwrap(),
+ )
+ };
+
+ // CollectLeft mode should never use partition index
+ let join = HashJoinExec::try_new(
+ Arc::clone(&left),
+ Arc::clone(&right),
+ make_on(&left_schema, &right_schema),
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::CollectLeft,
+ NullEquality::NullEqualsNothing,
+ false,
+ )?;
+ assert!(
+ !join.should_use_partition_index(),
+ "CollectLeft should never use partition index"
+ );
+
+ // Partitioned without RepartitionExec should use partition index
+ let join = HashJoinExec::try_new(
+ Arc::clone(&left),
+ Arc::clone(&right),
+ make_on(&left_schema, &right_schema),
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::Partitioned,
+ NullEquality::NullEqualsNothing,
+ false,
+ )?;
+ assert!(
+ join.should_use_partition_index(),
+ "no RepartitionExec on either side, should use partition index"
+ );
+
+ // Partitioned with RepartitionExec(Hash) on both sides should not use
partition index
+ let join = HashJoinExec::try_new(
+ make_hash_repart(Arc::clone(&left), &left_schema),
+ make_hash_repart(Arc::clone(&right), &right_schema),
+ make_on(&left_schema, &right_schema),
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::Partitioned,
+ NullEquality::NullEqualsNothing,
+ false,
+ )?;
+ assert!(
+ !join.should_use_partition_index(),
+ "both sides have Hash RepartitionExec, should use CASE routing"
+ );
+
+ // Partitioned with RepartitionExec on only one side should not use
partition index
+ let join = HashJoinExec::try_new(
+ make_hash_repart(Arc::clone(&left), &left_schema),
+ Arc::clone(&right),
+ make_on(&left_schema, &right_schema),
+ None,
+ &JoinType::Inner,
+ None,
+ PartitionMode::Partitioned,
+ NullEquality::NullEqualsNothing,
+ false,
+ )?;
+ assert!(
+ !join.should_use_partition_index(),
+ "one side has Hash RepartitionExec, should not use partition index"
+ );
+
+ Ok(())
+ }
Review Comment:
Same here. You may want to split this into many tests
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -769,6 +769,36 @@ impl HashJoinExec {
self.dynamic_filter.as_ref().map(|df| &df.filter)
}
+ /// Determines whether partition-index routing should be used instead of
CASE hash routing.
+ ///
+ /// Partition-index routing is enabled when:
+ /// 1. The join is in `Partitioned` mode
+ /// 2. Neither side has a `RepartitionExec(Hash)`, meaning both sides
preserve their file
+ /// partitioning.
+ fn should_use_partition_index(&self) -> bool {
+ if self.mode != PartitionMode::Partitioned {
+ return false;
+ }
+ !Self::has_hash_repartition(&self.left)
+ && !Self::has_hash_repartition(&self.right)
+ }
+
+ /// Walk the plan tree looking for a `RepartitionExec` with `Hash`
partitioning.
+ fn has_hash_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
+ let mut stack = vec![Arc::clone(plan)];
+ while let Some(current) = stack.pop() {
+ if let Some(repart) =
current.as_any().downcast_ref::<RepartitionExec>()
+ && matches!(repart.partitioning(), Partitioning::Hash(_, _))
+ {
+ return true;
+ }
+ for child in current.children() {
+ stack.push(Arc::clone(child));
+ }
+ }
+ false
+ }
Review Comment:
In wonder if this is the right walk. Should we only check that there is
RepartitionExec right before the join? Would we introduce bugs here? Maybe
drawing some examples on paper will help you know whether this is correct.
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -769,6 +769,36 @@ impl HashJoinExec {
self.dynamic_filter.as_ref().map(|df| &df.filter)
}
+ /// Determines whether partition-index routing should be used instead of
CASE hash routing.
+ ///
+ /// Partition-index routing is enabled when:
+ /// 1. The join is in `Partitioned` mode
Review Comment:
I wonder whether adding a comment here saying we do not have problem with
CollectLeft with mamy partitions on the probe side is useful or not. It is
because there is only one hash table and it will be used for pruning and
filtering to all partitions of the probe side
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -5698,4 +5730,256 @@ mod tests {
.contains("null_aware anti join only supports single column
join key")
);
}
+
+ #[test]
+ fn test_has_hash_repartition() {
+ let left = build_table(
+ ("a1", &vec![1, 2]),
+ ("b1", &vec![4, 5]),
+ ("c1", &vec![7, 8]),
+ );
+
+ // DataSource -> RepartitionExec(Hash) should return true
+ let schema = left.schema();
+ let hash_exprs =
+ vec![Arc::new(Column::new_with_schema("b1", &schema).unwrap()) as
_];
+ let hash_repartition = Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ );
+ assert!(
+ HashJoinExec::has_hash_repartition(
+ &(hash_repartition as Arc<dyn ExecutionPlan>)
+ ),
+ "should detect Hash RepartitionExec"
+ );
+
+ // DataSource -> RepartitionExec(RoundRobin) should return false
+ let round_robin_repartition = Arc::new(
+ RepartitionExec::try_new(Arc::clone(&left),
Partitioning::RoundRobinBatch(4))
+ .unwrap(),
+ );
+ assert!(
+ !HashJoinExec::has_hash_repartition(
+ &(round_robin_repartition as Arc<dyn ExecutionPlan>)
+ ),
+ "RoundRobin should not be detected as hash repartition"
+ );
+
+ // DataSource -> RepartitionExec(UnknownPartitioning) should return
false
+ let unknown_repartition = Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::UnknownPartitioning(4),
+ )
+ .unwrap(),
+ );
+ assert!(
+ !HashJoinExec::has_hash_repartition(
+ &(unknown_repartition as Arc<dyn ExecutionPlan>)
+ ),
+ "UnknownPartitioning should not be detected as hash repartition"
+ );
+
+ // Plain DataSource (no RepartitionExec) should return false
+ assert!(
+ !HashJoinExec::has_hash_repartition(&left),
+ "plan without RepartitionExec should return false"
+ );
+
+ // CoalescePartitionsExec -> RepartitionExec(Hash) should return true
+ let coalesce = Arc::new(CoalescePartitionsExec::new(Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ ))) as Arc<dyn ExecutionPlan>;
+ assert!(
+ HashJoinExec::has_hash_repartition(&coalesce),
+ "should traverse through CoalescePartitionsExec to find Hash
RepartitionExec"
+ );
+
+ // CoalescePartitionsExec -> DataSource (no RepartitionExec) should
return false
+ let coalesce_no_repart =
Arc::new(CoalescePartitionsExec::new(Arc::clone(&left)))
+ as Arc<dyn ExecutionPlan>;
+ assert!(
+ !HashJoinExec::has_hash_repartition(&coalesce_no_repart),
+ "should return false when traversing through
CoalescePartitionsExec with no RepartitionExec"
+ );
+
+ // Multiple single-child operators -> RepartitionExec(Hash) should
return true
+ let deep_chain = Arc::new(CoalescePartitionsExec::new(Arc::new(
+ CoalescePartitionsExec::new(Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ )),
+ ))) as Arc<dyn ExecutionPlan>;
+ assert!(
+ HashJoinExec::has_hash_repartition(&deep_chain),
+ "should traverse through multiple single-child operators to find
Hash RepartitionExec"
+ );
+
+ // UnionExec with no RepartitionExec on either branch should return
false
+ let union_no_repart =
+ crate::union::UnionExec::try_new(vec![Arc::clone(&left),
Arc::clone(&left)])
+ .unwrap();
+ assert!(
+ !HashJoinExec::has_hash_repartition(&union_no_repart),
+ "UnionExec with no RepartitionExec on any branch should return
false"
+ );
+
+ // UnionExec with RepartitionExec(Hash) on one branch should return
true
+ let hash_branch = Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs.clone(), 4),
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+ let union_one_hash =
+ crate::union::UnionExec::try_new(vec![Arc::clone(&left),
hash_branch])
+ .unwrap();
+ assert!(
+ HashJoinExec::has_hash_repartition(&union_one_hash),
+ "UnionExec with RepartitionExec(Hash) on one branch should return
true"
+ );
+
+ // CoalescePartitionsExec -> UnionExec -> ... -> RepartitionExec(Hash)
should return true
+ let deep_union = Arc::new(CoalescePartitionsExec::new(
+ crate::union::UnionExec::try_new(vec![
+ Arc::clone(&left),
+ Arc::new(CoalescePartitionsExec::new(Arc::new(
+ RepartitionExec::try_new(
+ Arc::clone(&left),
+ Partitioning::Hash(hash_exprs, 4),
+ )
+ .unwrap(),
+ ))) as Arc<dyn ExecutionPlan>,
+ ])
+ .unwrap(),
+ )) as Arc<dyn ExecutionPlan>;
+ assert!(
+ HashJoinExec::has_hash_repartition(&deep_union),
+ "should find RepartitionExec(Hash) through UnionExec and multiple
nesting levels"
+ );
+ }
Review Comment:
I think you will get a comment to split this test into many tests
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -279,15 +281,182 @@ async fn
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
// Iterate one batch
stream.next().await.unwrap().unwrap();
- // Test that filters are pushed down correctly to each side of the join
- // NOTE: We dropped the CASE expression here because we now optimize that
away if there's only 1 partition
+ // Test that filters are pushed down correctly to each side of the join.
+ // This test has no RepartitionExec, so partition-index routing is used.
insta::assert_snapshot!(
format_plan_for_test(&plan),
@r"
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false],
filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, e, f], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ]
AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, e, f], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ {0: d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa,
ab])} ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
+ "
+ );
+}
+
+#[tokio::test]
+async fn test_topk_with_partition_index_routing() {
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition.
+ // Partition 0: ("aa", "ba", 1.0) -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz", 2.0) -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
+ )
+ .unwrap(),
+ ];
+ let build_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz"]),
+ ("b", Utf8, ["zz"]),
+ ("c", Float64, [2.0])
+ )
+ .unwrap(),
+ ];
+
+ // Probe side: each partition has matching and non-matching rows.
+ // Partition 0: ("aa","ba",10.0) matches p0, ("zz","zz",20.0) does not
match p0
+ // Partition 1: ("zz","zz",30.0) matches p1, ("aa","ba",40.0) does not
match p1
Review Comment:
For your tests, I think these data is ok. However, it does not clear in the
context of `partitioned hash join`. You may want to have data that clearly
define partitions for both build and probe sides and make build side smaller
and scatter so you can filter data from the probe side
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]