Dandandan commented on a change in pull request #9882:
URL: https://github.com/apache/arrow/pull/9882#discussion_r606680992



##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -223,18 +237,76 @@ impl ExecutionPlan for HashJoinExec {
 
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
         let on_left = self.on.iter().map(|on| 
on.0.clone()).collect::<Vec<_>>();
-
-        // we only want to compute the build side once
+        // we only want to compute the build side once for 
PartitionMode::MergeLeft
         let left_data = {
-            let mut build_side = self.build_side.lock().await;
-            match build_side.as_ref() {
-                Some(stream) => stream.clone(),
-                None => {
+            match self.mode {
+                PartitionMode::MergeLeft => {
+                    let mut build_side = self.build_side.lock().await;
+
+                    match build_side.as_ref() {
+                        Some(stream) => stream.clone(),
+                        None => {
+                            let start = Instant::now();
+
+                            // merge all left parts into a single stream
+                            let merge = MergeExec::new(self.left.clone());
+                            let stream = merge.execute(0).await?;
+
+                            // This operation performs 2 steps at once:
+                            // 1. creates a [JoinHashMap] of all batches from 
the stream
+                            // 2. stores the batches in a vector.
+                            let initial = (
+                                JoinHashMap::with_hasher(IdHashBuilder {}),
+                                Vec::new(),
+                                0,
+                                Vec::new(),
+                            );
+                            let (hashmap, batches, num_rows, _) = stream
+                                .try_fold(initial, |mut acc, batch| async {
+                                    let hash = &mut acc.0;
+                                    let values = &mut acc.1;
+                                    let offset = acc.2;
+                                    acc.3.clear();
+                                    acc.3.resize(batch.num_rows(), 0);
+                                    update_hash(
+                                        &on_left,
+                                        &batch,
+                                        hash,
+                                        offset,
+                                        &self.random_state,
+                                        &mut acc.3,
+                                    )
+                                    .unwrap();
+                                    acc.2 += batch.num_rows();
+                                    values.push(batch);
+                                    Ok(acc)
+                                })
+                                .await?;
+
+                            // Merge all batches into a single batch, so we
+                            // can directly index into the arrays
+                            let single_batch =
+                                concat_batches(&self.left.schema(), &batches, 
num_rows)?;
+
+                            let left_side = Arc::new((hashmap, single_batch));
+
+                            *build_side = Some(left_side.clone());
+
+                            debug!(
+                            "Built build-side of hash join containing {} rows 
in {} ms",
+                            num_rows,
+                            start.elapsed().as_millis()
+                        );
+
+                            left_side
+                        }
+                    }
+                }
+                PartitionMode::Partitioned => {
                     let start = Instant::now();
 
-                    // merge all left parts into a single stream
-                    let merge = MergeExec::new(self.left.clone());
-                    let stream = merge.execute(0).await?;
+                    // Load 1 partition of left side in memory
+                    let stream = self.left.execute(partition).await?;

Review comment:
       Here is the main difference




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to