This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fcd17c85c2 Minor: Encapsulate `LeftJoinData` into a struct (rather 
than anonymous enum) and add comments (#8153)
fcd17c85c2 is described below

commit fcd17c85c2eba1c5c8d92beb52f4351286f2dcea
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Nov 14 03:12:04 2023 -0500

    Minor: Encapsulate `LeftJoinData` into a struct (rather than anonymous 
enum) and add comments (#8153)
    
    * Minor: Encapsulate LeftJoinData into a struct (rather than anonymous enum)
    
    * clippy
---
 datafusion/physical-plan/src/joins/hash_join.rs    | 72 ++++++++++++++++------
 .../physical-plan/src/joins/hash_join_utils.rs     |  9 ++-
 2 files changed, 61 insertions(+), 20 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 546a929bf9..da57fa07cc 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -73,7 +73,47 @@ use datafusion_physical_expr::EquivalenceProperties;
 use ahash::RandomState;
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 
-type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
+/// HashTable and input data for the left (build side) of a join
+struct JoinLeftData {
+    /// The hash table with indices into `batch`
+    hash_map: JoinHashMap,
+    /// The input rows for the build side
+    batch: RecordBatch,
+    /// Memory reservation that tracks memory used by `hash_map` hash table
+    /// `batch`. Cleared on drop.
+    #[allow(dead_code)]
+    reservation: MemoryReservation,
+}
+
+impl JoinLeftData {
+    /// Create a new `JoinLeftData` from its parts
+    fn new(
+        hash_map: JoinHashMap,
+        batch: RecordBatch,
+        reservation: MemoryReservation,
+    ) -> Self {
+        Self {
+            hash_map,
+            batch,
+            reservation,
+        }
+    }
+
+    /// Returns the number of rows in the build side
+    fn num_rows(&self) -> usize {
+        self.batch.num_rows()
+    }
+
+    /// return a reference to the hash map
+    fn hash_map(&self) -> &JoinHashMap {
+        &self.hash_map
+    }
+
+    /// returns a reference to the build side batch
+    fn batch(&self) -> &RecordBatch {
+        &self.batch
+    }
+}
 
 /// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
 /// partitions using a hash table and an optional filter list to apply post
@@ -692,8 +732,9 @@ async fn collect_left_input(
     // Merge all batches into a single batch, so we
     // can directly index into the arrays
     let single_batch = concat_batches(&schema, &batches, num_rows)?;
+    let data = JoinLeftData::new(hashmap, single_batch, reservation);
 
-    Ok((hashmap, single_batch, reservation))
+    Ok(data)
 }
 
 /// Updates `hash` with new entries from [RecordBatch] evaluated against the 
expressions `on`,
@@ -770,7 +811,7 @@ struct HashJoinStream {
     left_fut: OnceFut<JoinLeftData>,
     /// Which left (probe) side rows have been matches while creating output.
     /// For some OUTER joins, we need to know which rows have not been matched
-    /// to produce the correct.
+    /// to produce the correct output.
     visited_left_side: Option<BooleanBufferBuilder>,
     /// right (probe) input
     right: SendableRecordBatchStream,
@@ -1042,13 +1083,13 @@ impl HashJoinStream {
         {
             // TODO: Replace `ceil` wrapper with stable `div_cell` after
             // https://github.com/rust-lang/rust/issues/88581
-            let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 
8);
+            let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
             self.reservation.try_grow(visited_bitmap_size)?;
             self.join_metrics.build_mem_used.add(visited_bitmap_size);
         }
 
         let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
-            let num_rows = left_data.1.num_rows();
+            let num_rows = left_data.num_rows();
             if need_produce_result_in_final(self.join_type) {
                 // Some join types need to track which row has be matched or 
unmatched:
                 // `left semi` join:  need to use the bitmap to produce the 
matched row in the left side
@@ -1075,8 +1116,8 @@ impl HashJoinStream {
 
                     // get the matched two indices for the on condition
                     let left_right_indices = 
build_equal_condition_join_indices(
-                        &left_data.0,
-                        &left_data.1,
+                        left_data.hash_map(),
+                        left_data.batch(),
                         &batch,
                         &self.on_left,
                         &self.on_right,
@@ -1108,7 +1149,7 @@ impl HashJoinStream {
 
                             let result = build_batch_from_indices(
                                 &self.schema,
-                                &left_data.1,
+                                left_data.batch(),
                                 &batch,
                                 &left_side,
                                 &right_side,
@@ -1140,7 +1181,7 @@ impl HashJoinStream {
                         // use the left and right indices to produce the batch 
result
                         let result = build_batch_from_indices(
                             &self.schema,
-                            &left_data.1,
+                            left_data.batch(),
                             &empty_right_batch,
                             &left_side,
                             &right_side,
@@ -2519,16 +2560,11 @@ mod tests {
             ("c", &vec![30, 40]),
         );
 
-        let left_data = (
-            JoinHashMap {
-                map: hashmap_left,
-                next,
-            },
-            left,
-        );
+        let join_hash_map = JoinHashMap::new(hashmap_left, next);
+
         let (l, r) = build_equal_condition_join_indices(
-            &left_data.0,
-            &left_data.1,
+            &join_hash_map,
+            &left,
             &right,
             &[Column::new("a", 0)],
             &[Column::new("a", 0)],
diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs 
b/datafusion/physical-plan/src/joins/hash_join_utils.rs
index 5ebf370b6d..fecbf96f08 100644
--- a/datafusion/physical-plan/src/joins/hash_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs
@@ -103,12 +103,17 @@ use hashbrown::HashSet;
 /// ```
 pub struct JoinHashMap {
     // Stores hash value to last row index
-    pub map: RawTable<(u64, u64)>,
+    map: RawTable<(u64, u64)>,
     // Stores indices in chained list data structure
-    pub next: Vec<u64>,
+    next: Vec<u64>,
 }
 
 impl JoinHashMap {
+    #[cfg(test)]
+    pub(crate) fn new(map: RawTable<(u64, u64)>, next: Vec<u64>) -> Self {
+        Self { map, next }
+    }
+
     pub(crate) fn with_capacity(capacity: usize) -> Self {
         JoinHashMap {
             map: RawTable::with_capacity(capacity),

Reply via email to