Dandandan commented on a change in pull request #9882:
URL: https://github.com/apache/arrow/pull/9882#discussion_r608141787



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -344,13 +345,33 @@ impl DefaultPhysicalPlanner {
                     JoinType::Left => hash_utils::JoinType::Left,
                     JoinType::Right => hash_utils::JoinType::Right,
                 };
-
-                Ok(Arc::new(HashJoinExec::try_new(
-                    left,
-                    right,
-                    &keys,
-                    &physical_join_type,
-                )?))
+                if ctx_state.config.concurrency > 1 {
+                    let left_expr = keys.iter().map(|x| col(&x.0)).collect();
+                    let right_expr = keys.iter().map(|x| col(&x.1)).collect();
+
+                    // Use hash partition by defualt to parallelize hash joins
+                    Ok(Arc::new(HashJoinExec::try_new(
+                        Arc::new(RepartitionExec::try_new(

Review comment:
       You are totally right about this.
   
   For Ballista @andygrove was planning to implement shuffle hash join which 
should be pretty similar to this except that the data/partitions should be 
shuffled to different workers. When the data is balanced, here adding more 
nodes should help process more data, even without spilling to disk - given we 
don't have other sources of high memory usage / unlimited buffering.
   A sort merge join implementation would also make sense for Ballista in the 
longer term and probably would be great to have in DataFusion too (next to any 
other scalable algorithms).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to