alamb commented on a change in pull request #9882:
URL: https://github.com/apache/arrow/pull/9882#discussion_r607332073



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -86,6 +86,17 @@ pub struct HashJoinExec {
     build_side: Arc<Mutex<Option<JoinLeftData>>>,
     /// Shares the `RandomState` for the hashing algorithm
     random_state: RandomState,
+    /// Partitioning mode to use
+    mode: PartitionMode,
+}
+
+#[derive(Clone, Copy, Debug, PartialEq)]
+/// Partitioning mode to use for hash join
+pub enum PartitionMode {
+    /// Left/right children are partitioned using the left and right keys
+    Partitioned,
+    /// Left side will be merged and collected into one partition
+    MergeLeft,

Review comment:
       maybe `CollectLeft` ? Every time I hear "Merge" I always think of 
"merging sorted streams" (e.g. the DataFusion `MergeExec` operator still 
confused me each time I see its name)

##########
File path: rust/benchmarks/src/bin/tpch.rs
##########
@@ -1537,7 +1537,7 @@ mod tests {
                 Field::new("c_name", DataType::Utf8, true),
                 Field::new("c_custkey", DataType::Int32, true),
                 Field::new("o_orderkey", DataType::Int32, true),
-                Field::new("o_orderdat", DataType::Date32, true),

Review comment:
       👍  nice

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -344,13 +345,33 @@ impl DefaultPhysicalPlanner {
                     JoinType::Left => hash_utils::JoinType::Left,
                     JoinType::Right => hash_utils::JoinType::Right,
                 };
-
-                Ok(Arc::new(HashJoinExec::try_new(
-                    left,
-                    right,
-                    &keys,
-                    &physical_join_type,
-                )?))
+                if ctx_state.config.concurrency > 1 {
+                    let left_expr = keys.iter().map(|x| col(&x.0)).collect();
+                    let right_expr = keys.iter().map(|x| col(&x.1)).collect();
+
+                    // Use hash partition by defualt to parallelize hash joins
+                    Ok(Arc::new(HashJoinExec::try_new(
+                        Arc::new(RepartitionExec::try_new(

Review comment:
       One thing I worry about is that `RepartitionExec` has the potential to 
(effectively) buffer up the entire result set of the producer makes data faster 
than the consumer can read it -- e.g. 
https://github.com/apache/arrow/blob/8e43f23dcc6a9e630516228f110c48b64d13cec6/rust/datafusion/src/physical_plan/repartition.rs#L121-L126
 it uses unbounded streams.
   
   Thus this particular structure might end up having a tradeoff of memory 
usage for keeping more cores busy -- which might or might not be the right 
trade off depending on the system). 
   
   I wonder if you would be amenable to adding a new setting to 
`ExecutionConfig` that controlled this optimization? 
   
   So like 
   ```
                   if ctx_state.config.concurrency > 1 && 
config_ctx.config.repartition_joins {
   ```

##########
File path: rust/datafusion/tests/dataframe.rs
##########
@@ -71,8 +71,7 @@ async fn join() -> Result<()> {
 
     let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?;
 
-    let batches = a.collect().await?;
-    assert_eq!(batches.len(), 1);
+    let _ = a.collect().await?;

Review comment:
       maybe we could change the test to assert on the total number of rows in 
the result set rather than the number of batches?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to