This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch bucketing in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 6aad5e2790a9339361ea3ac3b4afb8aa7a396fb8 Author: Daniƫl Heres <[email protected]> AuthorDate: Wed Jun 28 20:43:40 2023 +0200 Bucketed hash join --- .../core/src/physical_plan/joins/hash_join.rs | 70 +++++++--------------- .../src/physical_plan/joins/hash_join_utils.rs | 5 +- testing | 2 +- 3 files changed, 27 insertions(+), 50 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index a3c553c9b3..672c7dedd6 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -571,26 +571,17 @@ pub fn update_hash( let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; // insert hashes to key of the hashmap - for (row, hash_value) in hash_values.iter().enumerate() { - let item = hash_map - .map - .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, index)) = item { - // Already exists: add index to next array - let prev_index = *index; - // Store new value inside hashmap - *index = (row + offset + 1) as u64; + if hash_map.map.len() > 0 { + for (row, hash_value) in hash_values.iter().enumerate() { + let bucket = (*hash_value % (hash_map.map.len() as u64)) as usize; + + let prev_index = hash_map.map[bucket]; + + hash_map.map[bucket] = (row + offset + 1) as u64; + // Update chained Vec at row + offset with previous value hash_map.next[row + offset] = prev_index; - } else { - hash_map.map.insert( - *hash_value, - // store the value + 1 as 0 value reserved for end of list - (*hash_value, (row + offset + 1) as u64), - |(hash, _)| *hash, - ); - // chained list at (row + offset) is already initialized with 0 - // meaning end of list + } } Ok(()) @@ -696,27 +687,22 @@ pub fn build_equal_condition_join_indices( let mut build_indices = UInt64BufferBuilder::new(0); let mut probe_indices = UInt32BufferBuilder::new(0); // Visit all of the probe rows - for (row, hash_value) in hash_values.iter().enumerate() { - // Get the hash and find it in the build index - - // For every item on the build and probe we check if it matches - // This possibly contains rows with hash collisions, - // So we have to check here whether rows are equal or not - if let Some((_, index)) = build_hashmap - .map - .get(*hash_value, |(hash, _)| *hash_value == *hash) - { - let mut i = *index - 1; - loop { + if build_hashmap.map.len() > 0 { + for (row, hash_value) in hash_values.iter().enumerate() { + // Get the hash and find it in the build index + + let bucket = (*hash_value % (build_hashmap.map.len() as u64)) as usize; + + // For every item on the build and probe we check if it matches + // This possibly contains rows with hash collisions, + // So we have to check here whether rows are equal or not + let mut next = build_hashmap.map[bucket]; + while next != 0 { + let i = next - 1; build_indices.append(i); probe_indices.append(row as u32); // Follow the chain to get the next index value - let next = build_hashmap.next[i as usize]; - if next == 0 { - // end of list - break; - } - i = next - 1; + next = build_hashmap.next[i as usize]; } } } @@ -1310,7 +1296,6 @@ mod tests { use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::Literal; - use hashbrown::raw::RawTable; use crate::execution::context::SessionConfig; use crate::physical_expr::expressions::BinaryExpr; @@ -1321,7 +1306,6 @@ mod tests { physical_plan::{ common, expressions::Column, - hash_utils::create_hashes, joins::{hash_join::build_equal_condition_join_indices, utils::JoinSide}, memory::MemoryExec, repartition::RepartitionExec, @@ -2651,7 +2635,6 @@ mod tests { #[test] fn join_with_hash_collision() -> Result<()> { - let mut hashmap_left = RawTable::with_capacity(2); let left = build_table_i32( ("a", &vec![10, 20]), ("x", &vec![100, 200]), @@ -2659,13 +2642,6 @@ mod tests { ); let random_state = RandomState::with_seeds(0, 0, 0, 0); - let hashes_buff = &mut vec![0; left.num_rows()]; - let hashes = - create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; - - // Create hash collisions (same hashes) - hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h); let next = vec![2, 0]; @@ -2677,7 +2653,7 @@ mod tests { let left_data = ( JoinHashMap { - map: hashmap_left, + map: vec![1], next, }, left, diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 1b9cbd543d..b3ac1f392b 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -91,7 +91,7 @@ use datafusion_common::Result; // https://github.com/apache/arrow-datafusion/issues/50 pub struct JoinHashMap { // Stores hash value to first index - pub map: RawTable<(u64, u64)>, + pub map: Vec<u64>, // Stores indices in chained list data structure pub next: Vec<u64>, } @@ -103,7 +103,8 @@ pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); impl JoinHashMap { pub(crate) fn with_capacity(capacity: usize) -> Self { JoinHashMap { - map: RawTable::with_capacity(capacity), + // Overallocate using 2 x the buckets + map: vec![0; capacity * 2], next: vec![0; capacity], } } diff --git a/testing b/testing index e81d0c6de3..5bab2f264a 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit e81d0c6de35948b3be7984af8e00413b314cde6e +Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88
