This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new da8a244be2 Merge hash table implementations and remove leftover
utilities (#7366)
da8a244be2 is described below
commit da8a244be23db5816b792d3ea33168fc4c1a4831
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Wed Aug 23 22:47:37 2023 +0300
Merge hash table implementations and remove leftover utilities (#7366)
* Initial for tracking
* Before clippy
* Single implementation
* As any mut impl.
* Review changes
* Leftovers
* Update Cargo.toml
* Void implementation
* Update datafusion/core/src/physical_plan/joins/hash_join_utils.rs
Co-authored-by: Daniël Heres <[email protected]>
* Code corrections
* Remove unused imports
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Daniël Heres <[email protected]>
---
datafusion-cli/Cargo.lock | 1 -
datafusion/core/Cargo.toml | 1 -
.../core/src/physical_plan/joins/hash_join.rs | 409 +++------------------
.../src/physical_plan/joins/hash_join_utils.rs | 249 ++++++++++---
.../src/physical_plan/joins/symmetric_hash_join.rs | 327 +++++-----------
5 files changed, 328 insertions(+), 659 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index b05a4200ac..3bdb9a3772 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1082,7 +1082,6 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"rand",
- "smallvec",
"sqlparser",
"tempfile",
"tokio",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index a91da9fbf4..90ff8b644e 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -85,7 +85,6 @@ parquet = { workspace = true }
percent-encoding = "2.2.0"
pin-project-lite = "^0.2.7"
rand = "0.8"
-smallvec = { version = "1.6", features = ["union"] }
sqlparser = { workspace = true }
tempfile = "3"
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread",
"sync", "fs", "parking_lot"] }
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 0928998ec8..f7d257e324 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -36,7 +36,7 @@ use crate::physical_plan::{
expressions::Column,
expressions::PhysicalSortExpr,
hash_utils::create_hashes,
- joins::hash_join_utils::JoinHashMap,
+ joins::hash_join_utils::{JoinHashMap, JoinHashMapType},
joins::utils::{
adjust_right_output_partitioning, build_join_schema,
check_join_is_valid,
combine_join_equivalence_properties, estimate_join_statistics,
@@ -53,29 +53,17 @@ use super::{
PartitionMode,
};
+use arrow::array::{
+ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray,
UInt32Array,
+ UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder,
+};
use arrow::buffer::BooleanBuffer;
use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
-use arrow::{
- array::{
- Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Date32Array,
Date64Array,
- Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array,
- Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
LargeStringArray,
- PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
- Time64MicrosecondArray, Time64NanosecondArray,
TimestampMicrosecondArray,
- TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
- UInt16Array, UInt32Array, UInt32BufferBuilder, UInt64Array,
UInt64BufferBuilder,
- UInt8Array,
- },
- datatypes::{
- ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, Int8Type,
Schema,
- SchemaRef, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
- },
- util::bit_util,
-};
+use arrow::util::bit_util;
use arrow_array::cast::downcast_array;
use arrow_schema::ArrowError;
-use datafusion_common::cast::{as_dictionary_array, as_string_array};
use datafusion_common::{
exec_err, internal_err, plan_err, DataFusionError, JoinType, Result,
};
@@ -600,6 +588,7 @@ async fn collect_left_input(
offset,
&random_state,
&mut hashes_buffer,
+ 0,
)?;
offset += batch.num_rows();
}
@@ -612,14 +601,18 @@ async fn collect_left_input(
/// Updates `hash` with new entries from [RecordBatch] evaluated against the
expressions `on`,
/// assuming that the [RecordBatch] corresponds to the `index`th
-pub fn update_hash(
+pub fn update_hash<T>(
on: &[Column],
batch: &RecordBatch,
- hash_map: &mut JoinHashMap,
+ hash_map: &mut T,
offset: usize,
random_state: &RandomState,
hashes_buffer: &mut Vec<u64>,
-) -> Result<()> {
+ deleted_offset: usize,
+) -> Result<()>
+where
+ T: JoinHashMapType,
+{
// evaluate the keys
let keys_values = on
.iter()
@@ -629,20 +622,22 @@ pub fn update_hash(
// calculate the hash values
let hash_values = create_hashes(&keys_values, random_state,
hashes_buffer)?;
+ // For usual JoinHashmap, the implementation is void.
+ hash_map.extend_zero(batch.num_rows());
+
// insert hashes to key of the hashmap
+ let (mut_map, mut_list) = hash_map.get_mut();
for (row, hash_value) in hash_values.iter().enumerate() {
- let item = hash_map
- .map
- .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+ let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value ==
*hash);
if let Some((_, index)) = item {
// Already exists: add index to next array
let prev_index = *index;
// Store new value inside hashmap
*index = (row + offset + 1) as u64;
// Update chained Vec at row + offset with previous value
- hash_map.next[row + offset] = prev_index;
+ mut_list[row + offset - deleted_offset] = prev_index;
} else {
- hash_map.map.insert(
+ mut_map.insert(
*hash_value,
// store the value + 1 as 0 value reserved for end of list
(*hash_value, (row + offset + 1) as u64),
@@ -725,8 +720,8 @@ impl RecordBatchStream for HashJoinStream {
// Build indices: 4, 5, 6, 6
// Probe indices: 3, 3, 4, 5
#[allow(clippy::too_many_arguments)]
-pub fn build_equal_condition_join_indices(
- build_hashmap: &JoinHashMap,
+pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
+ build_hashmap: &T,
build_input_buffer: &RecordBatch,
probe_batch: &RecordBatch,
build_on: &[Column],
@@ -736,6 +731,7 @@ pub fn build_equal_condition_join_indices(
hashes_buffer: &mut Vec<u64>,
filter: Option<&JoinFilter>,
build_side: JoinSide,
+ deleted_offset: Option<usize>,
) -> Result<(UInt64Array, UInt32Array)> {
let keys_values = probe_on
.iter()
@@ -783,22 +779,33 @@ pub fn build_equal_condition_join_indices(
// (5,1)
//
// With this approach, the lexicographic order on both the probe side and
the build side is preserved.
+ let hash_map = build_hashmap.get_map();
+ let next_chain = build_hashmap.get_list();
for (row, hash_value) in hash_values.iter().enumerate().rev() {
// Get the hash and find it in the build index
// For every item on the build and probe we check if it matches
// This possibly contains rows with hash collisions,
// So we have to check here whether rows are equal or not
- if let Some((_, index)) = build_hashmap
- .map
- .get(*hash_value, |(hash, _)| *hash_value == *hash)
+ if let Some((_, index)) =
+ hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
{
let mut i = *index - 1;
loop {
- build_indices.append(i);
+ let build_row_value = if let Some(offset) = deleted_offset {
+ // This arguments means that we prune the next index way
before here.
+ if i < offset as u64 {
+ // End of the list due to pruning
+ break;
+ }
+ i - offset as u64
+ } else {
+ i
+ };
+ build_indices.append(build_row_value);
probe_indices.append(row as u32);
// Follow the chain to get the next index value
- let next = build_hashmap.next[i as usize];
+ let next = next_chain[build_row_value as usize];
if next == 0 {
// end of list
break;
@@ -837,338 +844,6 @@ pub fn build_equal_condition_join_indices(
)
}
-macro_rules! equal_rows_elem {
- ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident,
$null_equals_null: ident) => {{
- let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap();
- let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap();
-
- match (left_array.is_null($left), right_array.is_null($right)) {
- (false, false) => left_array.value($left) ==
right_array.value($right),
- (true, true) => $null_equals_null,
- _ => false,
- }
- }};
-}
-
-macro_rules! equal_rows_elem_with_string_dict {
- ($key_array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident,
$null_equals_null: ident) => {{
- let left_array: &DictionaryArray<$key_array_type> =
- as_dictionary_array::<$key_array_type>($l).unwrap();
- let right_array: &DictionaryArray<$key_array_type> =
- as_dictionary_array::<$key_array_type>($r).unwrap();
-
- let (left_values, left_values_index) = {
- let keys_col = left_array.keys();
- if keys_col.is_valid($left) {
- let values_index = keys_col
- .value($left)
- .to_usize()
- .expect("Can not convert index to usize in dictionary");
-
- (
- as_string_array(left_array.values()).unwrap(),
- Some(values_index),
- )
- } else {
- (as_string_array(left_array.values()).unwrap(), None)
- }
- };
- let (right_values, right_values_index) = {
- let keys_col = right_array.keys();
- if keys_col.is_valid($right) {
- let values_index = keys_col
- .value($right)
- .to_usize()
- .expect("Can not convert index to usize in dictionary");
-
- (
- as_string_array(right_array.values()).unwrap(),
- Some(values_index),
- )
- } else {
- (as_string_array(right_array.values()).unwrap(), None)
- }
- };
-
- match (left_values_index, right_values_index) {
- (Some(left_values_index), Some(right_values_index)) => {
- left_values.value(left_values_index)
- == right_values.value(right_values_index)
- }
- (None, None) => $null_equals_null,
- _ => false,
- }
- }};
-}
-
-/// Left and right row have equal values
-/// If more data types are supported here, please also add the data types in
can_hash function
-/// to generate hash join logical plan.
-pub fn equal_rows(
- left: usize,
- right: usize,
- left_arrays: &[ArrayRef],
- right_arrays: &[ArrayRef],
- null_equals_null: bool,
-) -> Result<bool> {
- let mut err = None;
- let res = left_arrays
- .iter()
- .zip(right_arrays)
- .all(|(l, r)| match l.data_type() {
- DataType::Null => {
- // lhs and rhs are both `DataType::Null`, so the equal result
- // is dependent on `null_equals_null`
- null_equals_null
- }
- DataType::Boolean => {
- equal_rows_elem!(BooleanArray, l, r, left, right,
null_equals_null)
- }
- DataType::Int8 => {
- equal_rows_elem!(Int8Array, l, r, left, right,
null_equals_null)
- }
- DataType::Int16 => {
- equal_rows_elem!(Int16Array, l, r, left, right,
null_equals_null)
- }
- DataType::Int32 => {
- equal_rows_elem!(Int32Array, l, r, left, right,
null_equals_null)
- }
- DataType::Int64 => {
- equal_rows_elem!(Int64Array, l, r, left, right,
null_equals_null)
- }
- DataType::UInt8 => {
- equal_rows_elem!(UInt8Array, l, r, left, right,
null_equals_null)
- }
- DataType::UInt16 => {
- equal_rows_elem!(UInt16Array, l, r, left, right,
null_equals_null)
- }
- DataType::UInt32 => {
- equal_rows_elem!(UInt32Array, l, r, left, right,
null_equals_null)
- }
- DataType::UInt64 => {
- equal_rows_elem!(UInt64Array, l, r, left, right,
null_equals_null)
- }
- DataType::Float32 => {
- equal_rows_elem!(Float32Array, l, r, left, right,
null_equals_null)
- }
- DataType::Float64 => {
- equal_rows_elem!(Float64Array, l, r, left, right,
null_equals_null)
- }
- DataType::Date32 => {
- equal_rows_elem!(Date32Array, l, r, left, right,
null_equals_null)
- }
- DataType::Date64 => {
- equal_rows_elem!(Date64Array, l, r, left, right,
null_equals_null)
- }
- DataType::Time32(time_unit) => match time_unit {
- TimeUnit::Second => {
- equal_rows_elem!(Time32SecondArray, l, r, left, right,
null_equals_null)
- }
- TimeUnit::Millisecond => {
- equal_rows_elem!(Time32MillisecondArray, l, r, left,
right, null_equals_null)
- }
- _ => {
- err = Some(internal_err!(
- "Unsupported data type in hasher"
- ));
- false
- }
- }
- DataType::Time64(time_unit) => match time_unit {
- TimeUnit::Microsecond => {
- equal_rows_elem!(Time64MicrosecondArray, l, r, left,
right, null_equals_null)
- }
- TimeUnit::Nanosecond => {
- equal_rows_elem!(Time64NanosecondArray, l, r, left, right,
null_equals_null)
- }
- _ => {
- err = Some(internal_err!(
- "Unsupported data type in hasher"
- ));
- false
- }
- }
- DataType::Timestamp(time_unit, None) => match time_unit {
- TimeUnit::Second => {
- equal_rows_elem!(
- TimestampSecondArray,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- TimeUnit::Millisecond => {
- equal_rows_elem!(
- TimestampMillisecondArray,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- TimeUnit::Microsecond => {
- equal_rows_elem!(
- TimestampMicrosecondArray,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- TimeUnit::Nanosecond => {
- equal_rows_elem!(
- TimestampNanosecondArray,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- },
- DataType::Utf8 => {
- equal_rows_elem!(StringArray, l, r, left, right,
null_equals_null)
- }
- DataType::LargeUtf8 => {
- equal_rows_elem!(LargeStringArray, l, r, left, right,
null_equals_null)
- }
- DataType::FixedSizeBinary(_) => {
- equal_rows_elem!(FixedSizeBinaryArray, l, r, left, right,
null_equals_null)
- }
- DataType::Decimal128(_, lscale) => match r.data_type() {
- DataType::Decimal128(_, rscale) => {
- if lscale == rscale {
- equal_rows_elem!(
- Decimal128Array,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- } else {
- err = Some(internal_err!(
- "Inconsistent Decimal data type in hasher, the
scale should be same"
- ));
- false
- }
- }
- _ => {
- err = Some(internal_err!(
- "Unsupported data type in hasher"
- ));
- false
- }
- },
- DataType::Dictionary(key_type, value_type)
- if *value_type.as_ref() == DataType::Utf8 =>
- {
- match key_type.as_ref() {
- DataType::Int8 => {
- equal_rows_elem_with_string_dict!(
- Int8Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- DataType::Int16 => {
- equal_rows_elem_with_string_dict!(
- Int16Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- DataType::Int32 => {
- equal_rows_elem_with_string_dict!(
- Int32Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- DataType::Int64 => {
- equal_rows_elem_with_string_dict!(
- Int64Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- DataType::UInt8 => {
- equal_rows_elem_with_string_dict!(
- UInt8Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- DataType::UInt16 => {
- equal_rows_elem_with_string_dict!(
- UInt16Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- DataType::UInt32 => {
- equal_rows_elem_with_string_dict!(
- UInt32Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- DataType::UInt64 => {
- equal_rows_elem_with_string_dict!(
- UInt64Type,
- l,
- r,
- left,
- right,
- null_equals_null
- )
- }
- _ => {
- // should not happen
- err = Some(internal_err!(
- "Unsupported data type in hasher"
- ));
- false
- }
- }
- }
- other => {
- // This is internal because we should have caught this before.
- err = Some(internal_err!(
- "Unsupported data type in hasher: {other}"
- ));
- false
- }
- });
-
- err.unwrap_or(Ok(res))
-}
-
// version of eq_dyn supporting equality on null arrays
fn eq_dyn_null(
left: &dyn Array,
@@ -1297,6 +972,7 @@ impl HashJoinStream {
&mut hashes_buffer,
self.filter.as_ref(),
JoinSide::Left,
+ None,
);
let result = match left_right_indices {
@@ -2759,6 +2435,7 @@ mod tests {
&mut vec![0; right.num_rows()],
None,
JoinSide::Left,
+ None,
)?;
let mut left_ids = UInt64Builder::with_capacity(0);
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 37790e6bb8..b80413b53d 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -18,30 +18,29 @@
//! This file contains common subroutines for regular and symmetric hash join
//! related functionality, used both in join calculations and optimization
rules.
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
+use std::fmt::{Debug, Formatter};
+use std::ops::IndexMut;
use std::sync::Arc;
use std::{fmt, usize};
-use arrow::datatypes::{ArrowNativeType, SchemaRef};
+use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
+use crate::physical_plan::ExecutionPlan;
use arrow::compute::concat_batches;
+use arrow::datatypes::{ArrowNativeType, SchemaRef};
use arrow_array::builder::BooleanBufferBuilder;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray,
RecordBatch};
use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval,
IntervalBound};
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+
use hashbrown::raw::RawTable;
use hashbrown::HashSet;
use parking_lot::Mutex;
-use smallvec::SmallVec;
-use std::fmt::{Debug, Formatter};
-
-use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
-use crate::physical_plan::ExecutionPlan;
-use datafusion_common::Result;
// Maps a `u64` hash value based on the build side ["on" values] to a list of
indices with this key's value.
// By allocating a `HashMap` with capacity for *at least* the number of rows
for entries at the build side,
@@ -94,20 +93,15 @@ use datafusion_common::Result;
// ---------------------
// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices
values 4,3,1)
// ---------------------
-
// TODO: speed up collision checks
// https://github.com/apache/arrow-datafusion/issues/50
pub struct JoinHashMap {
- // Stores hash value to first index
+ // Stores hash value to last row index
pub map: RawTable<(u64, u64)>,
// Stores indices in chained list data structure
pub next: Vec<u64>,
}
-/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the
indices inline, allowing it to mutate
-/// and shrink the indices.
-pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
-
impl JoinHashMap {
pub(crate) fn with_capacity(capacity: usize) -> Self {
JoinHashMap {
@@ -117,37 +111,201 @@ impl JoinHashMap {
}
}
-impl SymmetricJoinHashMap {
+/// Trait defining methods that must be implemented by a hash map type to be
used for joins.
+pub trait JoinHashMapType {
+ /// The type of list used to store the hash values.
+ type NextType: IndexMut<usize, Output = u64>;
+ /// Extend with zero
+ fn extend_zero(&mut self, len: usize);
+ /// Returns mutable references to the hash map and the next.
+ fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType);
+ /// Returns a reference to the hash map.
+ fn get_map(&self) -> &RawTable<(u64, u64)>;
+ /// Returns a reference to the next.
+ fn get_list(&self) -> &Self::NextType;
+}
+
+/// Implementation of `JoinHashMapType` for `JoinHashMap`.
+impl JoinHashMapType for JoinHashMap {
+ type NextType = Vec<u64>;
+
+ // Void implementation
+ fn extend_zero(&mut self, _: usize) {}
+
+ /// Get mutable references to the hash map and the next.
+ fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
+ (&mut self.map, &mut self.next)
+ }
+
+ /// Get a reference to the hash map.
+ fn get_map(&self) -> &RawTable<(u64, u64)> {
+ &self.map
+ }
+
+ /// Get a reference to the next.
+ fn get_list(&self) -> &Self::NextType {
+ &self.next
+ }
+}
+
+/// Implementation of `JoinHashMapType` for `PruningJoinHashMap`.
+impl JoinHashMapType for PruningJoinHashMap {
+ type NextType = VecDeque<u64>;
+
+ // Extend with zero
+ fn extend_zero(&mut self, len: usize) {
+ self.next.resize(self.next.len() + len, 0)
+ }
+
+ /// Get mutable references to the hash map and the next.
+ fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
+ (&mut self.map, &mut self.next)
+ }
+
+ /// Get a reference to the hash map.
+ fn get_map(&self) -> &RawTable<(u64, u64)> {
+ &self.map
+ }
+
+ /// Get a reference to the next.
+ fn get_list(&self) -> &Self::NextType {
+ &self.next
+ }
+}
+
+impl fmt::Debug for JoinHashMap {
+ fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
+ Ok(())
+ }
+}
+
+/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with
+/// the capability of pruning elements in an efficient manner. This structure
+/// is particularly useful for cases where it's necessary to remove elements
+/// from the map based on their buffer order.
+///
+/// # Example
+///
+/// ``` text
+/// Let's continue the example of `JoinHashMap` and then show how
`PruningJoinHashMap` would
+/// handle the pruning scenario.
+///
+/// Insert the pair (1,4) into the `PruningJoinHashMap`:
+/// map:
+/// ---------
+/// | 1 | 5 |
+/// | 2 | 3 |
+/// ---------
+/// list:
+/// ---------------------
+/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices
values 4,3,1)
+/// ---------------------
+///
+/// Now, let's prune 3 rows from `PruningJoinHashMap`:
+/// map:
+/// ---------
+/// | 1 | 5 |
+/// ---------
+/// list:
+/// ---------
+/// | 2 | 4 | <--- hash value 1 maps to 2 (5 - 3), 1 (4 - 3), NA (2 - 3)
(which means indices values 1,0)
+/// ---------
+///
+/// After pruning, the | 2 | 3 | entry is deleted from `PruningJoinHashMap`
since
+/// there are no values left for this key.
+/// ```
+pub struct PruningJoinHashMap {
+ /// Stores hash value to last row index
+ pub map: RawTable<(u64, u64)>,
+ /// Stores indices in chained list data structure
+ pub next: VecDeque<u64>,
+}
+
+impl PruningJoinHashMap {
+ /// Constructs a new `PruningJoinHashMap` with the given capacity.
+ /// Both the map and the list are pre-allocated with the provided capacity.
+ ///
+ /// # Arguments
+ /// * `capacity`: The initial capacity of the hash map.
+ ///
+ /// # Returns
+ /// A new instance of `PruningJoinHashMap`.
pub(crate) fn with_capacity(capacity: usize) -> Self {
- Self(RawTable::with_capacity(capacity))
+ PruningJoinHashMap {
+ map: RawTable::with_capacity(capacity),
+ next: VecDeque::with_capacity(capacity),
+ }
}
- /// In this implementation, the scale_factor variable determines how
conservative the shrinking strategy is.
- /// The value of scale_factor is set to 4, which means the capacity will
be reduced by 25%
- /// when necessary. You can adjust the scale_factor value to achieve the
desired
- /// balance between memory usage and performance.
- //
- // If you increase the scale_factor, the capacity will shrink less
aggressively,
- // leading to potentially higher memory usage but fewer resizes.
- // Conversely, if you decrease the scale_factor, the capacity will shrink
more aggressively,
- // potentially leading to lower memory usage but more frequent resizing.
+ /// Shrinks the capacity of the hash map, if necessary, based on the
+ /// provided scale factor.
+ ///
+ /// # Arguments
+ /// * `scale_factor`: The scale factor that determines how conservative the
+ /// shrinking strategy is. The capacity will be reduced by
1/`scale_factor`
+ /// when necessary.
+ ///
+ /// # Note
+ /// Increasing the scale factor results in less aggressive capacity
shrinking,
+ /// leading to potentially higher memory usage but fewer resizes.
Conversely,
+ /// decreasing the scale factor results in more aggressive capacity
shrinking,
+ /// potentially leading to lower memory usage but more frequent resizing.
pub(crate) fn shrink_if_necessary(&mut self, scale_factor: usize) {
- let capacity = self.0.capacity();
- let len = self.0.len();
+ let capacity = self.map.capacity();
- if capacity > scale_factor * len {
+ if capacity > scale_factor * self.map.len() {
let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
- self.0.shrink_to(new_capacity, |(hash, _)| *hash)
+ // Resize the map with the new capacity.
+ self.map.shrink_to(new_capacity, |(hash, _)| *hash)
}
}
+ /// Calculates the size of the `PruningJoinHashMap` in bytes.
+ ///
+ /// # Returns
+ /// The size of the hash map in bytes.
pub(crate) fn size(&self) -> usize {
- self.0.allocation_info().1.size()
+ self.map.allocation_info().1.size()
+ + self.next.capacity() * std::mem::size_of::<u64>()
}
-}
-impl fmt::Debug for JoinHashMap {
- fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
+ /// Removes hash values from the map and the list based on the given
pruning
+ /// length and deleting offset.
+ ///
+ /// # Arguments
+ /// * `prune_length`: The number of elements to remove from the list.
+ /// * `deleting_offset`: The offset used to determine which hash values to
remove from the map.
+ ///
+ /// # Returns
+ /// A `Result` indicating whether the operation was successful.
+ pub(crate) fn prune_hash_values(
+ &mut self,
+ prune_length: usize,
+ deleting_offset: u64,
+ shrink_factor: usize,
+ ) -> Result<()> {
+ // Remove elements from the list based on the pruning length.
+ self.next.drain(0..prune_length);
+
+ // Calculate the keys that should be removed from the map.
+ let removable_keys = unsafe {
+ self.map
+ .iter()
+ .map(|bucket| bucket.as_ref())
+ .filter_map(|(hash, tail_index)| {
+ (*tail_index < prune_length as u64 +
deleting_offset).then_some(*hash)
+ })
+ .collect::<Vec<_>>()
+ };
+
+ // Remove the keys from the map.
+ removable_keys.into_iter().for_each(|hash_value| {
+ self.map
+ .remove_entry(hash_value, |(hash, _)| hash_value == *hash);
+ });
+
+ // Shrink the map if necessary.
+ self.shrink_if_necessary(shrink_factor);
Ok(())
}
}
@@ -682,7 +840,6 @@ pub mod tests {
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{binary, cast, col, lit};
- use smallvec::smallvec;
use std::sync::Arc;
/// Filter expr for a + b > c + 10 AND a + b < c + 100
@@ -1020,40 +1177,40 @@ pub mod tests {
#[test]
fn test_shrink_if_necessary() {
let scale_factor = 4;
- let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100);
+ let mut join_hash_map = PruningJoinHashMap::with_capacity(100);
let data_size = 2000;
let deleted_part = 3 * data_size / 4;
// Add elements to the JoinHashMap
for hash_value in 0..data_size {
- join_hash_map.0.insert(
+ join_hash_map.map.insert(
hash_value,
- (hash_value, smallvec![hash_value]),
+ (hash_value, hash_value),
|(hash, _)| *hash,
);
}
- assert_eq!(join_hash_map.0.len(), data_size as usize);
- assert!(join_hash_map.0.capacity() >= data_size as usize);
+ assert_eq!(join_hash_map.map.len(), data_size as usize);
+ assert!(join_hash_map.map.capacity() >= data_size as usize);
// Remove some elements from the JoinHashMap
for hash_value in 0..deleted_part {
join_hash_map
- .0
+ .map
.remove_entry(hash_value, |(hash, _)| hash_value == *hash);
}
- assert_eq!(join_hash_map.0.len(), (data_size - deleted_part) as usize);
+ assert_eq!(join_hash_map.map.len(), (data_size - deleted_part) as
usize);
// Old capacity
- let old_capacity = join_hash_map.0.capacity();
+ let old_capacity = join_hash_map.map.capacity();
// Test shrink_if_necessary
join_hash_map.shrink_if_necessary(scale_factor);
// The capacity should be reduced by the scale factor
let new_expected_capacity =
- join_hash_map.0.capacity() * (scale_factor - 1) / scale_factor;
- assert!(join_hash_map.0.capacity() >= new_expected_capacity);
- assert!(join_hash_map.0.capacity() <= old_capacity);
+ join_hash_map.map.capacity() * (scale_factor - 1) / scale_factor;
+ assert!(join_hash_map.map.capacity() >= new_expected_capacity);
+ assert!(join_hash_map.map.capacity() <= old_capacity);
}
}
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 3309a39b61..1c664adfbb 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -25,7 +25,6 @@
//! This plan uses the [OneSideHashJoiner] object to facilitate join
calculations
//! for both its children.
-use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
@@ -33,29 +32,15 @@ use std::task::Poll;
use std::vec;
use std::{any::Any, usize};
-use ahash::RandomState;
-use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray,
PrimitiveBuilder};
-use arrow::compute::concat_batches;
-use arrow::datatypes::{Schema, SchemaRef};
-use arrow::record_batch::RecordBatch;
-use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder};
-use arrow_array::{UInt32Array, UInt64Array};
-use datafusion_physical_expr::hash_utils::create_hashes;
-use datafusion_physical_expr::PhysicalExpr;
-use futures::stream::{select, BoxStream};
-use futures::{Stream, StreamExt};
-use hashbrown::HashSet;
-use parking_lot::Mutex;
-use smallvec::smallvec;
-
-use datafusion_execution::memory_pool::MemoryConsumer;
-use datafusion_physical_expr::intervals::ExprIntervalGraph;
-
use crate::physical_plan::common::SharedMemoryReservation;
+use crate::physical_plan::joins::hash_join::{
+ build_equal_condition_join_indices, update_hash,
+};
use crate::physical_plan::joins::hash_join_utils::{
build_filter_expression_graph, calculate_filter_expr_intervals,
combine_two_batches,
convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
get_pruning_semi_indices, record_visited_indices,
IntervalCalculatorInnerState,
+ PruningJoinHashMap,
};
use crate::physical_plan::joins::StreamJoinPartitionMode;
use crate::physical_plan::DisplayAs;
@@ -74,14 +59,23 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
+
+use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray,
PrimitiveBuilder};
+use arrow::compute::concat_batches;
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
use datafusion_common::utils::bisect;
use datafusion_common::{internal_err, plan_err, JoinType};
use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
+use datafusion_physical_expr::intervals::ExprIntervalGraph;
-use super::hash_join::equal_rows;
-use super::hash_join_utils::SymmetricJoinHashMap;
-use super::utils::apply_join_filter_to_indices;
+use ahash::RandomState;
+use futures::stream::{select, BoxStream};
+use futures::{Stream, StreamExt};
+use hashbrown::HashSet;
+use parking_lot::Mutex;
const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
@@ -620,41 +614,6 @@ impl Stream for SymmetricHashJoinStream {
}
}
-fn prune_hash_values(
- prune_length: usize,
- hashmap: &mut SymmetricJoinHashMap,
- row_hash_values: &mut VecDeque<u64>,
- offset: u64,
-) -> Result<()> {
- // Create a (hash)-(row number set) map
- let mut hash_value_map: HashMap<u64, HashSet<u64>> = HashMap::new();
- for index in 0..prune_length {
- let hash_value = row_hash_values.pop_front().unwrap();
- if let Some(set) = hash_value_map.get_mut(&hash_value) {
- set.insert(offset + index as u64);
- } else {
- let mut set = HashSet::new();
- set.insert(offset + index as u64);
- hash_value_map.insert(hash_value, set);
- }
- }
- for (hash_value, index_set) in hash_value_map.iter() {
- if let Some((_, separation_chain)) = hashmap
- .0
- .get_mut(*hash_value, |(hash, _)| hash_value == hash)
- {
- separation_chain.retain(|n| !index_set.contains(n));
- if separation_chain.is_empty() {
- hashmap
- .0
- .remove_entry(*hash_value, |(hash, _)| hash_value == hash);
- }
- }
- }
- hashmap.shrink_if_necessary(HASHMAP_SHRINK_SCALE_FACTOR);
- Ok(())
-}
-
/// Determine the pruning length for `buffer`.
///
/// This function evaluates the build side filter expression, converts the
@@ -834,144 +793,6 @@ pub(crate) fn build_side_determined_results(
}
}
-/// Gets build and probe indices which satisfy the on condition (including
-/// the equality condition and the join filter) in the join.
-#[allow(clippy::too_many_arguments)]
-pub fn build_join_indices(
- probe_batch: &RecordBatch,
- build_hashmap: &SymmetricJoinHashMap,
- build_input_buffer: &RecordBatch,
- on_build: &[Column],
- on_probe: &[Column],
- filter: Option<&JoinFilter>,
- random_state: &RandomState,
- null_equals_null: bool,
- hashes_buffer: &mut Vec<u64>,
- offset: Option<usize>,
- build_side: JoinSide,
-) -> Result<(UInt64Array, UInt32Array)> {
- // Get the indices that satisfy the equality condition, like `left.a1 =
right.a2`
- let (build_indices, probe_indices) = build_equal_condition_join_indices(
- build_hashmap,
- build_input_buffer,
- probe_batch,
- on_build,
- on_probe,
- random_state,
- null_equals_null,
- hashes_buffer,
- offset,
- )?;
- if let Some(filter) = filter {
- // Filter the indices which satisfy the non-equal join condition, like
`left.b1 = 10`
- apply_join_filter_to_indices(
- build_input_buffer,
- probe_batch,
- build_indices,
- probe_indices,
- filter,
- build_side,
- )
- } else {
- Ok((build_indices, probe_indices))
- }
-}
-
-// Returns build/probe indices satisfying the equality condition.
-// On LEFT.b1 = RIGHT.b2
-// LEFT Table:
-// a1 b1 c1
-// 1 1 10
-// 3 3 30
-// 5 5 50
-// 7 7 70
-// 9 8 90
-// 11 8 110
-// 13 10 130
-// RIGHT Table:
-// a2 b2 c2
-// 2 2 20
-// 4 4 40
-// 6 6 60
-// 8 8 80
-// 10 10 100
-// 12 10 120
-// The result is
-// "+----+----+-----+----+----+-----+",
-// "| a1 | b1 | c1 | a2 | b2 | c2 |",
-// "+----+----+-----+----+----+-----+",
-// "| 11 | 8 | 110 | 8 | 8 | 80 |",
-// "| 13 | 10 | 130 | 10 | 10 | 100 |",
-// "| 13 | 10 | 130 | 12 | 10 | 120 |",
-// "| 9 | 8 | 90 | 8 | 8 | 80 |",
-// "+----+----+-----+----+----+-----+"
-// And the result of build and probe indices are:
-// Build indices: 5, 6, 6, 4
-// Probe indices: 3, 4, 5, 3
-#[allow(clippy::too_many_arguments)]
-pub fn build_equal_condition_join_indices(
- build_hashmap: &SymmetricJoinHashMap,
- build_input_buffer: &RecordBatch,
- probe_batch: &RecordBatch,
- build_on: &[Column],
- probe_on: &[Column],
- random_state: &RandomState,
- null_equals_null: bool,
- hashes_buffer: &mut Vec<u64>,
- offset: Option<usize>,
-) -> Result<(UInt64Array, UInt32Array)> {
- let keys_values = probe_on
- .iter()
- .map(|c|
Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
- .collect::<Result<Vec<_>>>()?;
- let build_join_values = build_on
- .iter()
- .map(|c| {
- Ok(c.evaluate(build_input_buffer)?
- .into_array(build_input_buffer.num_rows()))
- })
- .collect::<Result<Vec<_>>>()?;
- hashes_buffer.clear();
- hashes_buffer.resize(probe_batch.num_rows(), 0);
- let hash_values = create_hashes(&keys_values, random_state,
hashes_buffer)?;
- // Using a buffer builder to avoid slower normal builder
- let mut build_indices = UInt64BufferBuilder::new(0);
- let mut probe_indices = UInt32BufferBuilder::new(0);
- let offset_value = offset.unwrap_or(0);
- // Visit all of the probe rows
- for (row, hash_value) in hash_values.iter().enumerate() {
- // Get the hash and find it in the build index
- // For every item on the build and probe we check if it matches
- // This possibly contains rows with hash collisions,
- // So we have to check here whether rows are equal or not
- if let Some((_, indices)) = build_hashmap
- .0
- .get(*hash_value, |(hash, _)| *hash_value == *hash)
- {
- for &i in indices {
- // Check hash collisions
- let offset_build_index = i as usize - offset_value;
- // Check hash collisions
- if equal_rows(
- offset_build_index,
- row,
- &build_join_values,
- &keys_values,
- null_equals_null,
- )? {
- build_indices.append(offset_build_index as u64);
- probe_indices.append(row as u32);
- }
- }
- }
- }
-
- Ok((
- PrimitiveArray::new(build_indices.finish().into(), None),
- PrimitiveArray::new(probe_indices.finish().into(), None),
- ))
-}
-
/// This method performs a join between the build side input buffer and the
probe side batch.
///
/// # Arguments
@@ -1006,18 +827,18 @@ pub(crate) fn join_with_probe_batch(
if build_hash_joiner.input_buffer.num_rows() == 0 ||
probe_batch.num_rows() == 0 {
return Ok(None);
}
- let (build_indices, probe_indices) = build_join_indices(
- probe_batch,
+ let (build_indices, probe_indices) = build_equal_condition_join_indices(
&build_hash_joiner.hashmap,
&build_hash_joiner.input_buffer,
+ probe_batch,
&build_hash_joiner.on,
&probe_hash_joiner.on,
- filter,
random_state,
null_equals_null,
&mut build_hash_joiner.hashes_buffer,
- Some(build_hash_joiner.deleted_offset),
+ filter,
build_hash_joiner.build_side,
+ Some(build_hash_joiner.deleted_offset),
)?;
if need_to_produce_result_in_final(build_hash_joiner.build_side,
join_type) {
record_visited_indices(
@@ -1063,9 +884,7 @@ pub struct OneSideHashJoiner {
/// Columns from the side
pub(crate) on: Vec<Column>,
/// Hashmap
- pub(crate) hashmap: SymmetricJoinHashMap,
- /// To optimize hash deleting in case of pruning, we hold them in memory
- row_hash_values: VecDeque<u64>,
+ pub(crate) hashmap: PruningJoinHashMap,
/// Reuse the hashes buffer
pub(crate) hashes_buffer: Vec<u64>,
/// Matched rows
@@ -1084,7 +903,6 @@ impl OneSideHashJoiner {
size += self.input_buffer.get_array_memory_size();
size += std::mem::size_of_val(&self.on);
size += self.hashmap.size();
- size += self.row_hash_values.capacity() * std::mem::size_of::<u64>();
size += self.hashes_buffer.capacity() * std::mem::size_of::<u64>();
size += self.visited_rows.capacity() * std::mem::size_of::<usize>();
size += std::mem::size_of_val(&self.offset);
@@ -1096,8 +914,7 @@ impl OneSideHashJoiner {
build_side,
input_buffer: RecordBatch::new_empty(schema),
on,
- hashmap: SymmetricJoinHashMap::with_capacity(0),
- row_hash_values: VecDeque::new(),
+ hashmap: PruningJoinHashMap::with_capacity(0),
hashes_buffer: vec![],
visited_rows: HashSet::new(),
offset: 0,
@@ -1105,39 +922,6 @@ impl OneSideHashJoiner {
}
}
- pub fn update_hash(
- on: &[Column],
- batch: &RecordBatch,
- hash_map: &mut SymmetricJoinHashMap,
- offset: usize,
- random_state: &RandomState,
- hashes_buffer: &mut Vec<u64>,
- ) -> Result<()> {
- // evaluate the keys
- let keys_values = on
- .iter()
- .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
- .collect::<Result<Vec<_>>>()?;
- // calculate the hash values
- let hash_values = create_hashes(&keys_values, random_state,
hashes_buffer)?;
- // insert hashes to key of the hashmap
- for (row, hash_value) in hash_values.iter().enumerate() {
- let item = hash_map
- .0
- .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
- if let Some((_, indices)) = item {
- indices.push((row + offset) as u64);
- } else {
- hash_map.0.insert(
- *hash_value,
- (*hash_value, smallvec![(row + offset) as u64]),
- |(hash, _)| *hash,
- );
- }
- }
- Ok(())
- }
-
/// Updates the internal state of the [OneSideHashJoiner] with the
incoming batch.
///
/// # Arguments
@@ -1159,16 +943,15 @@ impl OneSideHashJoiner {
self.hashes_buffer.resize(batch.num_rows(), 0);
// Get allocation_info before adding the item
// Update the hashmap with the join key values and hashes of the
incoming batch:
- Self::update_hash(
+ update_hash(
&self.on,
batch,
&mut self.hashmap,
self.offset,
random_state,
&mut self.hashes_buffer,
+ self.deleted_offset,
)?;
- // Add the hashes buffer to the hash value deque:
- self.row_hash_values.extend(self.hashes_buffer.iter());
Ok(())
}
@@ -1228,11 +1011,10 @@ impl OneSideHashJoiner {
pub(crate) fn prune_internal_state(&mut self, prune_length: usize) ->
Result<()> {
// Prune the hash values:
- prune_hash_values(
+ self.hashmap.prune_hash_values(
prune_length,
- &mut self.hashmap,
- &mut self.row_hash_values,
self.deleted_offset as u64,
+ HASHMAP_SHRINK_SCALE_FACTOR,
)?;
// Remove pruned rows from the visited rows set:
for row in self.deleted_offset..(self.deleted_offset + prune_length) {
@@ -1448,7 +1230,7 @@ mod tests {
};
use datafusion_common::ScalarValue;
- const TABLE_SIZE: i32 = 100;
+ const TABLE_SIZE: i32 = 1000;
pub async fn experiment(
left: Arc<dyn ExecutionPlan>,
@@ -1630,6 +1412,61 @@ mod tests {
Ok(())
}
+ #[tokio::test(flavor = "multi_thread")]
+ async fn join_all_one_ascending_numeric_v2() -> Result<()> {
+ let join_type = JoinType::Inner;
+ let cardinality = (4, 5);
+ let case_expr = 2;
+ let task_ctx = Arc::new(TaskContext::default());
+ let (left_batch, right_batch) = build_sides_record_batches(1000,
cardinality)?;
+ let left_schema = &left_batch.schema();
+ let right_schema = &right_batch.schema();
+ let left_sorted = vec![PhysicalSortExpr {
+ expr: col("la1", left_schema)?,
+ options: SortOptions::default(),
+ }];
+ let right_sorted = vec![PhysicalSortExpr {
+ expr: col("ra1", right_schema)?,
+ options: SortOptions::default(),
+ }];
+ let (left, right) = create_memory_table(
+ left_batch,
+ right_batch,
+ vec![left_sorted],
+ vec![right_sorted],
+ 13,
+ )?;
+
+ let on = vec![(
+ Column::new_with_schema("lc1", left_schema)?,
+ Column::new_with_schema("rc1", right_schema)?,
+ )];
+
+ let intermediate_schema = Schema::new(vec![
+ Field::new("left", DataType::Int32, true),
+ Field::new("right", DataType::Int32, true),
+ ]);
+ let filter_expr = join_expr_tests_fixture_i32(
+ case_expr,
+ col("left", &intermediate_schema)?,
+ col("right", &intermediate_schema)?,
+ );
+ let column_indices = vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Right,
+ },
+ ];
+ let filter = JoinFilter::new(filter_expr, column_indices,
intermediate_schema);
+
+ experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
+ Ok(())
+ }
+
#[rstest]
#[tokio::test(flavor = "multi_thread")]
async fn join_without_sort_information(