Dandandan commented on a change in pull request #9882: URL: https://github.com/apache/arrow/pull/9882#discussion_r606680992
########## File path: rust/datafusion/src/physical_plan/hash_join.rs ########## @@ -223,18 +237,76 @@ impl ExecutionPlan for HashJoinExec { async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { let on_left = self.on.iter().map(|on| on.0.clone()).collect::<Vec<_>>(); - - // we only want to compute the build side once + // we only want to compute the build side once for PartitionMode::MergeLeft let left_data = { - let mut build_side = self.build_side.lock().await; - match build_side.as_ref() { - Some(stream) => stream.clone(), - None => { + match self.mode { + PartitionMode::MergeLeft => { + let mut build_side = self.build_side.lock().await; + + match build_side.as_ref() { + Some(stream) => stream.clone(), + None => { + let start = Instant::now(); + + // merge all left parts into a single stream + let merge = MergeExec::new(self.left.clone()); + let stream = merge.execute(0).await?; + + // This operation performs 2 steps at once: + // 1. creates a [JoinHashMap] of all batches from the stream + // 2. stores the batches in a vector. + let initial = ( + JoinHashMap::with_hasher(IdHashBuilder {}), + Vec::new(), + 0, + Vec::new(), + ); + let (hashmap, batches, num_rows, _) = stream + .try_fold(initial, |mut acc, batch| async { + let hash = &mut acc.0; + let values = &mut acc.1; + let offset = acc.2; + acc.3.clear(); + acc.3.resize(batch.num_rows(), 0); + update_hash( + &on_left, + &batch, + hash, + offset, + &self.random_state, + &mut acc.3, + ) + .unwrap(); + acc.2 += batch.num_rows(); + values.push(batch); + Ok(acc) + }) + .await?; + + // Merge all batches into a single batch, so we + // can directly index into the arrays + let single_batch = + concat_batches(&self.left.schema(), &batches, num_rows)?; + + let left_side = Arc::new((hashmap, single_batch)); + + *build_side = Some(left_side.clone()); + + debug!( + "Built build-side of hash join containing {} rows in {} ms", + num_rows, + start.elapsed().as_millis() + ); + + left_side + } + } + } + PartitionMode::Partitioned => { let start = Instant::now(); - // merge all left parts into a single stream - let merge = MergeExec::new(self.left.clone()); - let stream = merge.execute(0).await?; + // Load 1 partition of left side in memory + let stream = self.left.execute(partition).await?; Review comment: Here is the main difference -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org