This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch adapt_datastructure in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 763c24d6c8658809ebba724e9835e3efda9862b5 Author: Daniƫl Heres <[email protected]> AuthorDate: Thu Jun 15 16:45:05 2023 +0200 Change HashJoin datastructure --- .../core/src/physical_plan/joins/hash_join.rs | 41 ++++++++++++++-------- .../src/physical_plan/joins/hash_join_utils.rs | 18 ++++++---- .../src/physical_plan/joins/symmetric_hash_join.rs | 28 +++++++++++++-- 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 0e62540d6d..f097e913d8 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -43,8 +43,6 @@ use arrow::{ util::bit_util, }; use futures::{ready, Stream, StreamExt, TryStreamExt}; -use hashbrown::raw::RawTable; -use smallvec::smallvec; use std::fmt; use std::sync::Arc; use std::task::Poll; @@ -518,9 +516,9 @@ async fn collect_left_input( reservation.try_grow(estimated_hastable_size)?; metrics.build_mem_used.add(estimated_hastable_size); - let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows)); + let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); - let mut offset = 0; + let mut offset = 1; for batch in batches.iter() { hashes_buffer.clear(); hashes_buffer.resize(batch.num_rows(), 0); @@ -565,14 +563,20 @@ pub fn update_hash( let item = hash_map .0 .get_mut(*hash_value, |(hash, _)| *hash_value == *hash); - if let Some((_, indices)) = item { - indices.push((row + offset) as u64); + if let Some((_, index)) = item { + // Already exists: add index to next array + let prev_index = *index; + *index = (row + offset) as u64; + // update chained Vec + hash_map.1[*index as usize] = prev_index; + } else { hash_map.0.insert( *hash_value, - (*hash_value, smallvec![(row + offset) as u64]), + (*hash_value, (row + offset) as u64), |(hash, _)| *hash, ); + // chained list is initalized with 0 } } Ok(()) @@ -727,13 +731,13 @@ pub fn build_equal_condition_join_indices( // For every item on the build and probe we check if it matches // This possibly contains rows with hash collisions, // So we have to check here whether rows are equal or not - if let Some((_, indices)) = build_hashmap + if let Some((_, index)) = build_hashmap .0 .get(*hash_value, |(hash, _)| *hash_value == *hash) { - for &i in indices { - // Check hash collisions - let offset_build_index = i as usize - offset_value; + let mut i = *index; + loop { + let offset_build_index = i as usize - offset_value - 1; // Check hash collisions if equal_rows( offset_build_index, @@ -745,6 +749,11 @@ pub fn build_equal_condition_join_indices( build_indices.append(offset_build_index as u64); probe_indices.append(row as u32); } + if build_hashmap.1[i as usize] != 0 { + i = build_hashmap.1[i as usize]; + } else { + break; + } } } } @@ -1258,11 +1267,11 @@ mod tests { use arrow::array::{ArrayRef, Date32Array, Int32Array, UInt32Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema}; - use smallvec::smallvec; use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::Literal; + use hashbrown::raw::RawTable; use crate::execution::context::SessionConfig; use crate::physical_expr::expressions::BinaryExpr; @@ -2616,8 +2625,10 @@ mod tests { create_hashes(&[left.columns()[0].clone()], &random_state, hashes_buff)?; // Create hash collisions (same hashes) - hashmap_left.insert(hashes[0], (hashes[0], smallvec![0, 1]), |(h, _)| *h); - hashmap_left.insert(hashes[1], (hashes[1], smallvec![0, 1]), |(h, _)| *h); + hashmap_left.insert(hashes[0], (hashes[0], 1), |(h, _)| *h); + hashmap_left.insert(hashes[1], (hashes[1], 1), |(h, _)| *h); + + let next = vec![0, 2, 0]; let right = build_table_i32( ("a", &vec![10, 20]), @@ -2625,7 +2636,7 @@ mod tests { ("c", &vec![30, 40]), ); - let left_data = (JoinHashMap(hashmap_left), left); + let left_data = (JoinHashMap(hashmap_left, next), left); let (l, r) = build_equal_condition_join_indices( &left_data.0, &left_data.1, diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs index 992de86dfe..59db05164d 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs @@ -30,13 +30,13 @@ use datafusion_physical_expr::intervals::Interval; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use hashbrown::raw::RawTable; -use smallvec::SmallVec; use crate::physical_plan::joins::utils::{JoinFilter, JoinSide}; use datafusion_common::Result; // Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. -// +// The indices (values) are stored in a separate chained list based on (index, next). +// The first item in the list is reserved. // Note that the `u64` keys are not stored in the hashmap (hence the `()` as key), but are only used // to put the indices in a certain bucket. // By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, @@ -47,9 +47,13 @@ use datafusion_common::Result; // but the values don't match. Those are checked in the [equal_rows] macro // TODO: speed up collision check and move away from using a hashbrown HashMap // https://github.com/apache/arrow-datafusion/issues/50 -pub struct JoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>); +pub struct JoinHashMap(pub RawTable<(u64, u64)>, pub Vec<u64>); impl JoinHashMap { + pub(crate) fn with_capacity(capacity: usize) -> JoinHashMap { + JoinHashMap(RawTable::with_capacity(capacity), vec![0; capacity + 1]) + } + /// In this implementation, the scale_factor variable determines how conservative the shrinking strategy is. /// The value of scale_factor is set to 4, which means the capacity will be reduced by 25% /// when necessary. You can adjust the scale_factor value to achieve the desired @@ -67,10 +71,11 @@ impl JoinHashMap { let new_capacity = (capacity * (scale_factor - 1)) / scale_factor; self.0.shrink_to(new_capacity, |(hash, _)| *hash) } + // todo handle chained list } pub(crate) fn size(&self) -> usize { - self.0.allocation_info().1.size() + self.0.allocation_info().1.size() + self.1.capacity() * 16 + 16 } } @@ -290,7 +295,6 @@ pub mod tests { use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{binary, cast, col, lit}; - use smallvec::smallvec; use std::sync::Arc; /// Filter expr for a + b > c + 10 AND a + b < c + 100 @@ -628,14 +632,14 @@ pub mod tests { #[test] fn test_shrink_if_necessary() { let scale_factor = 4; - let mut join_hash_map = JoinHashMap(RawTable::with_capacity(100)); + let mut join_hash_map = JoinHashMap::with_capacity(100); let data_size = 2000; let deleted_part = 3 * data_size / 4; // Add elements to the JoinHashMap for hash_value in 0..data_size { join_hash_map.0.insert( hash_value, - (hash_value, smallvec![hash_value]), + (hash_value, hash_value), |(hash, _)| *hash, ); } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index 7eac619687..80ea9e65dc 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -702,8 +702,30 @@ fn prune_hash_values( .0 .get_mut(*hash_value, |(hash, _)| hash_value == hash) { - separation_chain.retain(|n| !index_set.contains(n)); - if separation_chain.is_empty() { + let mut size = 0; + let mut i = separation_chain; + let mut prev_i = i; + + let mut keep = false; + + // TODO + // loop { + // if !index_set.contains(i) { + // if !keep { + // *prev_i = i; + // } + // // retain this value + // keep = true; + // size += 1; + // } + // // drop value + // *prev_i = i; + + // if *i == 0 { + // break; + // } + // } + if size == 0 { hashmap .0 .remove_entry(*hash_value, |(hash, _)| hash_value == hash); @@ -1076,7 +1098,7 @@ impl OneSideHashJoiner { build_side, input_buffer: RecordBatch::new_empty(schema), on, - hashmap: JoinHashMap(RawTable::with_capacity(0)), + hashmap: JoinHashMap::with_capacity(0), row_hash_values: VecDeque::new(), hashes_buffer: vec![], visited_rows: HashSet::new(),
