Dandandan commented on a change in pull request #9882: URL: https://github.com/apache/arrow/pull/9882#discussion_r608141787
########## File path: rust/datafusion/src/physical_plan/planner.rs ########## @@ -344,13 +345,33 @@ impl DefaultPhysicalPlanner { JoinType::Left => hash_utils::JoinType::Left, JoinType::Right => hash_utils::JoinType::Right, }; - - Ok(Arc::new(HashJoinExec::try_new( - left, - right, - &keys, - &physical_join_type, - )?)) + if ctx_state.config.concurrency > 1 { + let left_expr = keys.iter().map(|x| col(&x.0)).collect(); + let right_expr = keys.iter().map(|x| col(&x.1)).collect(); + + // Use hash partition by defualt to parallelize hash joins + Ok(Arc::new(HashJoinExec::try_new( + Arc::new(RepartitionExec::try_new( Review comment: You are totally right about this. For Ballista @andygrove was planning to implement shuffle hash join which should be pretty similar to this except that the data/partitions should be shuffled to different workers. When the data is balanced, here adding more nodes should help process more data, even without spilling to disk - given we don't have other sources of high memory usage / unlimited buffering. A sort merge join implementation would also make sense for Ballista in the longer term and probably would be great to have in DataFusion too (next to any other scalable algorithms). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org