gene-bordegaray commented on code in PR #20246:
URL: https://github.com/apache/datafusion/pull/20246#discussion_r2795537558
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -769,6 +769,36 @@ impl HashJoinExec {
self.dynamic_filter.as_ref().map(|df| &df.filter)
}
+ /// Determines whether partition-index routing should be used instead of
CASE hash routing.
+ ///
+ /// Partition-index routing is enabled when:
+ /// 1. The join is in `Partitioned` mode
+ /// 2. Neither side has a `RepartitionExec(Hash)`, meaning both sides
preserve their file
+ /// partitioning.
+ fn should_use_partition_index(&self) -> bool {
+ if self.mode != PartitionMode::Partitioned {
+ return false;
+ }
+ !Self::has_hash_repartition(&self.left)
+ && !Self::has_hash_repartition(&self.right)
+ }
+
+ /// Walk the plan tree looking for a `RepartitionExec` with `Hash`
partitioning.
+ fn has_hash_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
+ let mut stack = vec![Arc::clone(plan)];
+ while let Some(current) = stack.pop() {
+ if let Some(repart) =
current.as_any().downcast_ref::<RepartitionExec>()
+ && matches!(repart.partitioning(), Partitioning::Hash(_, _))
+ {
+ return true;
+ }
+ for child in current.children() {
+ stack.push(Arc::clone(child));
+ }
+ }
+ false
+ }
Review Comment:
I added the logic in enforce_distribution.rs but it is a bit more involved
than the decision using a parititoned hash join. I have added documentation
with explanation of cases and the method used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]