Dandandan commented on code in PR #6913:
URL: https://github.com/apache/arrow-datafusion/pull/6913#discussion_r1259511279


##########
datafusion/core/src/physical_plan/joins/hash_join_utils.rs:
##########
@@ -87,28 +87,77 @@ use datafusion_common::Result;
 // | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices 
values 4,3,1)
 // ---------------------
 
-// TODO: speed up collision checks
-// https://github.com/apache/arrow-datafusion/issues/50
+const MIN_JOIN_HASH_MAP_LEN: usize = 1024;
+
 pub struct JoinHashMap {
-    // Stores hash value to first index
-    pub map: RawTable<(u64, u64)>,
+    // Stores the first index for a bucket
+    buckets: Vec<u64>,
     // Stores indices in chained list data structure
     pub next: Vec<u64>,
+    // Stores the bucket mask for quickly finding a bucket for a hash value
+    bucket_mask: usize,
 }
 
-/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the 
indices inline, allowing it to mutate
-/// and shrink the indices.
-pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
-
 impl JoinHashMap {
     pub(crate) fn with_capacity(capacity: usize) -> Self {
+        let mut bucket_capacity = capacity.next_power_of_two();
+        if bucket_capacity < MIN_JOIN_HASH_MAP_LEN {
+            bucket_capacity = MIN_JOIN_HASH_MAP_LEN;
+        }
+        let bucket_mask = bucket_capacity - 1;
         JoinHashMap {
-            map: RawTable::with_capacity(capacity),
+            buckets: vec![0; bucket_capacity],
             next: vec![0; capacity],
+            bucket_mask,
         }
     }
+
+    /// Only used for testing
+    pub(crate) fn with_bucket_capacity(bucket_capacity: usize, capacity: 
usize) -> Self {
+        assert!(bucket_capacity > 0);
+        let bucket_capacity = bucket_capacity.next_power_of_two();
+        let bucket_mask = bucket_capacity - 1;
+        JoinHashMap {
+            buckets: vec![0; bucket_capacity],
+            next: vec![0; capacity],
+            bucket_mask,
+        }
+    }
+
+    #[inline]
+    pub(crate) fn insert(&mut self, hash_value: u64, index: usize) {
+        let bucket_index = self.bucket_mask & (hash_value as usize);
+        // We are sure the `bucket_index` is legal
+        unsafe {
+            let index_ref = self.buckets.get_unchecked_mut(bucket_index);
+            let prev_index = *index_ref;
+            // chained list at index is already initialized with 0
+            // 0 meaning end of list
+            *index_ref = index as u64 + 1;
+            if prev_index != 0 {
+                // Update chained Vec at index with previous value
+                self.next[index] = prev_index;
+            }
+        }
+    }
+
+    #[inline]
+    pub(crate) fn get_first_index(&self, hash_value: u64) -> u64 {
+        let bucket_index = self.bucket_mask & (hash_value as usize);
+        unsafe { *self.buckets.get_unchecked(bucket_index) }
+    }
 }
 
+impl fmt::Debug for JoinHashMap {

Review Comment:
   Is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to