Dandandan commented on a change in pull request #9882: URL: https://github.com/apache/arrow/pull/9882#discussion_r606680978
########## File path: rust/datafusion/src/physical_plan/hash_join.rs ########## @@ -223,18 +237,76 @@ impl ExecutionPlan for HashJoinExec { async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { let on_left = self.on.iter().map(|on| on.0.clone()).collect::<Vec<_>>(); - - // we only want to compute the build side once + // we only want to compute the build side once for PartitionMode::MergeLeft let left_data = { - let mut build_side = self.build_side.lock().await; - match build_side.as_ref() { - Some(stream) => stream.clone(), - None => { + match self.mode { + PartitionMode::MergeLeft => { + let mut build_side = self.build_side.lock().await; + + match build_side.as_ref() { + Some(stream) => stream.clone(), + None => { + let start = Instant::now(); + + // merge all left parts into a single stream + let merge = MergeExec::new(self.left.clone()); + let stream = merge.execute(0).await?; + + // This operation performs 2 steps at once: + // 1. creates a [JoinHashMap] of all batches from the stream + // 2. stores the batches in a vector. + let initial = ( + JoinHashMap::with_hasher(IdHashBuilder {}), + Vec::new(), + 0, + Vec::new(), + ); + let (hashmap, batches, num_rows, _) = stream Review comment: This code is the "old" version, diff shows it in a wrong way. The code is a duplicated a bit, couldn't yet manage to extract it to a function without running into a compile error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org