This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new da8a244be2 Merge hash table implementations and remove leftover 
utilities (#7366)
da8a244be2 is described below

commit da8a244be23db5816b792d3ea33168fc4c1a4831
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Wed Aug 23 22:47:37 2023 +0300

    Merge hash table implementations and remove leftover utilities (#7366)
    
    * Initial for tracking
    
    * Before clippy
    
    * Single implementation
    
    * As any mut impl.
    
    * Review changes
    
    * Leftovers
    
    * Update Cargo.toml
    
    * Void implementation
    
    * Update datafusion/core/src/physical_plan/joins/hash_join_utils.rs
    
    Co-authored-by: Daniël Heres <[email protected]>
    
    * Code corrections
    
    * Remove unused imports
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
    Co-authored-by: Daniël Heres <[email protected]>
---
 datafusion-cli/Cargo.lock                          |   1 -
 datafusion/core/Cargo.toml                         |   1 -
 .../core/src/physical_plan/joins/hash_join.rs      | 409 +++------------------
 .../src/physical_plan/joins/hash_join_utils.rs     | 249 ++++++++++---
 .../src/physical_plan/joins/symmetric_hash_join.rs | 327 +++++-----------
 5 files changed, 328 insertions(+), 659 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index b05a4200ac..3bdb9a3772 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1082,7 +1082,6 @@ dependencies = [
  "percent-encoding",
  "pin-project-lite",
  "rand",
- "smallvec",
  "sqlparser",
  "tempfile",
  "tokio",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index a91da9fbf4..90ff8b644e 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -85,7 +85,6 @@ parquet = { workspace = true }
 percent-encoding = "2.2.0"
 pin-project-lite = "^0.2.7"
 rand = "0.8"
-smallvec = { version = "1.6", features = ["union"] }
 sqlparser = { workspace = true }
 tempfile = "3"
 tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", 
"sync", "fs", "parking_lot"] }
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 0928998ec8..f7d257e324 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -36,7 +36,7 @@ use crate::physical_plan::{
     expressions::Column,
     expressions::PhysicalSortExpr,
     hash_utils::create_hashes,
-    joins::hash_join_utils::JoinHashMap,
+    joins::hash_join_utils::{JoinHashMap, JoinHashMapType},
     joins::utils::{
         adjust_right_output_partitioning, build_join_schema, 
check_join_is_valid,
         combine_join_equivalence_properties, estimate_join_statistics,
@@ -53,29 +53,17 @@ use super::{
     PartitionMode,
 };
 
+use arrow::array::{
+    Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, 
UInt32Array,
+    UInt32BufferBuilder, UInt64Array, UInt64BufferBuilder,
+};
 use arrow::buffer::BooleanBuffer;
 use arrow::compute::{and, eq_dyn, is_null, or_kleene, take, FilterBuilder};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
-use arrow::{
-    array::{
-        Array, ArrayRef, BooleanArray, BooleanBufferBuilder, Date32Array, 
Date64Array,
-        Decimal128Array, DictionaryArray, FixedSizeBinaryArray, Float32Array,
-        Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, 
LargeStringArray,
-        PrimitiveArray, StringArray, Time32MillisecondArray, Time32SecondArray,
-        Time64MicrosecondArray, Time64NanosecondArray, 
TimestampMicrosecondArray,
-        TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray,
-        UInt16Array, UInt32Array, UInt32BufferBuilder, UInt64Array, 
UInt64BufferBuilder,
-        UInt8Array,
-    },
-    datatypes::{
-        ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, Int8Type, 
Schema,
-        SchemaRef, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
-    },
-    util::bit_util,
-};
+use arrow::util::bit_util;
 use arrow_array::cast::downcast_array;
 use arrow_schema::ArrowError;
-use datafusion_common::cast::{as_dictionary_array, as_string_array};
 use datafusion_common::{
     exec_err, internal_err, plan_err, DataFusionError, JoinType, Result,
 };
@@ -600,6 +588,7 @@ async fn collect_left_input(
             offset,
             &random_state,
             &mut hashes_buffer,
+            0,
         )?;
         offset += batch.num_rows();
     }
@@ -612,14 +601,18 @@ async fn collect_left_input(
 
 /// Updates `hash` with new entries from [RecordBatch] evaluated against the 
expressions `on`,
 /// assuming that the [RecordBatch] corresponds to the `index`th
-pub fn update_hash(
+pub fn update_hash<T>(
     on: &[Column],
     batch: &RecordBatch,
-    hash_map: &mut JoinHashMap,
+    hash_map: &mut T,
     offset: usize,
     random_state: &RandomState,
     hashes_buffer: &mut Vec<u64>,
-) -> Result<()> {
+    deleted_offset: usize,
+) -> Result<()>
+where
+    T: JoinHashMapType,
+{
     // evaluate the keys
     let keys_values = on
         .iter()
@@ -629,20 +622,22 @@ pub fn update_hash(
     // calculate the hash values
     let hash_values = create_hashes(&keys_values, random_state, 
hashes_buffer)?;
 
+    // For usual JoinHashmap, the implementation is void.
+    hash_map.extend_zero(batch.num_rows());
+
     // insert hashes to key of the hashmap
+    let (mut_map, mut_list) = hash_map.get_mut();
     for (row, hash_value) in hash_values.iter().enumerate() {
-        let item = hash_map
-            .map
-            .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
+        let item = mut_map.get_mut(*hash_value, |(hash, _)| *hash_value == 
*hash);
         if let Some((_, index)) = item {
             // Already exists: add index to next array
             let prev_index = *index;
             // Store new value inside hashmap
             *index = (row + offset + 1) as u64;
             // Update chained Vec at row + offset with previous value
-            hash_map.next[row + offset] = prev_index;
+            mut_list[row + offset - deleted_offset] = prev_index;
         } else {
-            hash_map.map.insert(
+            mut_map.insert(
                 *hash_value,
                 // store the value + 1 as 0 value reserved for end of list
                 (*hash_value, (row + offset + 1) as u64),
@@ -725,8 +720,8 @@ impl RecordBatchStream for HashJoinStream {
 // Build indices: 4, 5, 6, 6
 // Probe indices: 3, 3, 4, 5
 #[allow(clippy::too_many_arguments)]
-pub fn build_equal_condition_join_indices(
-    build_hashmap: &JoinHashMap,
+pub fn build_equal_condition_join_indices<T: JoinHashMapType>(
+    build_hashmap: &T,
     build_input_buffer: &RecordBatch,
     probe_batch: &RecordBatch,
     build_on: &[Column],
@@ -736,6 +731,7 @@ pub fn build_equal_condition_join_indices(
     hashes_buffer: &mut Vec<u64>,
     filter: Option<&JoinFilter>,
     build_side: JoinSide,
+    deleted_offset: Option<usize>,
 ) -> Result<(UInt64Array, UInt32Array)> {
     let keys_values = probe_on
         .iter()
@@ -783,22 +779,33 @@ pub fn build_equal_condition_join_indices(
     //     (5,1)
     //
     // With this approach, the lexicographic order on both the probe side and 
the build side is preserved.
+    let hash_map = build_hashmap.get_map();
+    let next_chain = build_hashmap.get_list();
     for (row, hash_value) in hash_values.iter().enumerate().rev() {
         // Get the hash and find it in the build index
 
         // For every item on the build and probe we check if it matches
         // This possibly contains rows with hash collisions,
         // So we have to check here whether rows are equal or not
-        if let Some((_, index)) = build_hashmap
-            .map
-            .get(*hash_value, |(hash, _)| *hash_value == *hash)
+        if let Some((_, index)) =
+            hash_map.get(*hash_value, |(hash, _)| *hash_value == *hash)
         {
             let mut i = *index - 1;
             loop {
-                build_indices.append(i);
+                let build_row_value = if let Some(offset) = deleted_offset {
+                    // This arguments means that we prune the next index way 
before here.
+                    if i < offset as u64 {
+                        // End of the list due to pruning
+                        break;
+                    }
+                    i - offset as u64
+                } else {
+                    i
+                };
+                build_indices.append(build_row_value);
                 probe_indices.append(row as u32);
                 // Follow the chain to get the next index value
-                let next = build_hashmap.next[i as usize];
+                let next = next_chain[build_row_value as usize];
                 if next == 0 {
                     // end of list
                     break;
@@ -837,338 +844,6 @@ pub fn build_equal_condition_join_indices(
     )
 }
 
-macro_rules! equal_rows_elem {
-    ($array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, 
$null_equals_null: ident) => {{
-        let left_array = $l.as_any().downcast_ref::<$array_type>().unwrap();
-        let right_array = $r.as_any().downcast_ref::<$array_type>().unwrap();
-
-        match (left_array.is_null($left), right_array.is_null($right)) {
-            (false, false) => left_array.value($left) == 
right_array.value($right),
-            (true, true) => $null_equals_null,
-            _ => false,
-        }
-    }};
-}
-
-macro_rules! equal_rows_elem_with_string_dict {
-    ($key_array_type:ident, $l: ident, $r: ident, $left: ident, $right: ident, 
$null_equals_null: ident) => {{
-        let left_array: &DictionaryArray<$key_array_type> =
-            as_dictionary_array::<$key_array_type>($l).unwrap();
-        let right_array: &DictionaryArray<$key_array_type> =
-            as_dictionary_array::<$key_array_type>($r).unwrap();
-
-        let (left_values, left_values_index) = {
-            let keys_col = left_array.keys();
-            if keys_col.is_valid($left) {
-                let values_index = keys_col
-                    .value($left)
-                    .to_usize()
-                    .expect("Can not convert index to usize in dictionary");
-
-                (
-                    as_string_array(left_array.values()).unwrap(),
-                    Some(values_index),
-                )
-            } else {
-                (as_string_array(left_array.values()).unwrap(), None)
-            }
-        };
-        let (right_values, right_values_index) = {
-            let keys_col = right_array.keys();
-            if keys_col.is_valid($right) {
-                let values_index = keys_col
-                    .value($right)
-                    .to_usize()
-                    .expect("Can not convert index to usize in dictionary");
-
-                (
-                    as_string_array(right_array.values()).unwrap(),
-                    Some(values_index),
-                )
-            } else {
-                (as_string_array(right_array.values()).unwrap(), None)
-            }
-        };
-
-        match (left_values_index, right_values_index) {
-            (Some(left_values_index), Some(right_values_index)) => {
-                left_values.value(left_values_index)
-                    == right_values.value(right_values_index)
-            }
-            (None, None) => $null_equals_null,
-            _ => false,
-        }
-    }};
-}
-
-/// Left and right row have equal values
-/// If more data types are supported here, please also add the data types in 
can_hash function
-/// to generate hash join logical plan.
-pub fn equal_rows(
-    left: usize,
-    right: usize,
-    left_arrays: &[ArrayRef],
-    right_arrays: &[ArrayRef],
-    null_equals_null: bool,
-) -> Result<bool> {
-    let mut err = None;
-    let res = left_arrays
-        .iter()
-        .zip(right_arrays)
-        .all(|(l, r)| match l.data_type() {
-            DataType::Null => {
-                // lhs and rhs are both `DataType::Null`, so the equal result
-                // is dependent on `null_equals_null`
-                null_equals_null
-            }
-            DataType::Boolean => {
-                equal_rows_elem!(BooleanArray, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Int8 => {
-                equal_rows_elem!(Int8Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Int16 => {
-                equal_rows_elem!(Int16Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Int32 => {
-                equal_rows_elem!(Int32Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Int64 => {
-                equal_rows_elem!(Int64Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::UInt8 => {
-                equal_rows_elem!(UInt8Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::UInt16 => {
-                equal_rows_elem!(UInt16Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::UInt32 => {
-                equal_rows_elem!(UInt32Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::UInt64 => {
-                equal_rows_elem!(UInt64Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Float32 => {
-                equal_rows_elem!(Float32Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Float64 => {
-                equal_rows_elem!(Float64Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Date32 => {
-                equal_rows_elem!(Date32Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Date64 => {
-                equal_rows_elem!(Date64Array, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Time32(time_unit) => match time_unit {
-                TimeUnit::Second => {
-                    equal_rows_elem!(Time32SecondArray, l, r, left, right, 
null_equals_null)
-                }
-                TimeUnit::Millisecond => {
-                    equal_rows_elem!(Time32MillisecondArray, l, r, left, 
right, null_equals_null)
-                }
-                _ => {
-                    err = Some(internal_err!(
-                        "Unsupported data type in hasher"
-                    ));
-                    false
-                }
-            }
-            DataType::Time64(time_unit) => match time_unit {
-                TimeUnit::Microsecond => {
-                    equal_rows_elem!(Time64MicrosecondArray, l, r, left, 
right, null_equals_null)
-                }
-                TimeUnit::Nanosecond => {
-                    equal_rows_elem!(Time64NanosecondArray, l, r, left, right, 
null_equals_null)
-                }
-                _ => {
-                    err = Some(internal_err!(
-                        "Unsupported data type in hasher"
-                    ));
-                    false
-                }
-            }
-            DataType::Timestamp(time_unit, None) => match time_unit {
-                TimeUnit::Second => {
-                    equal_rows_elem!(
-                        TimestampSecondArray,
-                        l,
-                        r,
-                        left,
-                        right,
-                        null_equals_null
-                    )
-                }
-                TimeUnit::Millisecond => {
-                    equal_rows_elem!(
-                        TimestampMillisecondArray,
-                        l,
-                        r,
-                        left,
-                        right,
-                        null_equals_null
-                    )
-                }
-                TimeUnit::Microsecond => {
-                    equal_rows_elem!(
-                        TimestampMicrosecondArray,
-                        l,
-                        r,
-                        left,
-                        right,
-                        null_equals_null
-                    )
-                }
-                TimeUnit::Nanosecond => {
-                    equal_rows_elem!(
-                        TimestampNanosecondArray,
-                        l,
-                        r,
-                        left,
-                        right,
-                        null_equals_null
-                    )
-                }
-            },
-            DataType::Utf8 => {
-                equal_rows_elem!(StringArray, l, r, left, right, 
null_equals_null)
-            }
-            DataType::LargeUtf8 => {
-                equal_rows_elem!(LargeStringArray, l, r, left, right, 
null_equals_null)
-            }
-            DataType::FixedSizeBinary(_) => {
-                equal_rows_elem!(FixedSizeBinaryArray, l, r, left, right, 
null_equals_null)
-            }
-            DataType::Decimal128(_, lscale) => match r.data_type() {
-                DataType::Decimal128(_, rscale) => {
-                    if lscale == rscale {
-                        equal_rows_elem!(
-                            Decimal128Array,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                    } else {
-                        err = Some(internal_err!(
-                            "Inconsistent Decimal data type in hasher, the 
scale should be same"
-                        ));
-                        false
-                    }
-                }
-                _ => {
-                    err = Some(internal_err!(
-                        "Unsupported data type in hasher"
-                    ));
-                    false
-                }
-            },
-            DataType::Dictionary(key_type, value_type)
-            if *value_type.as_ref() == DataType::Utf8 =>
-                {
-                    match key_type.as_ref() {
-                        DataType::Int8 => {
-                            equal_rows_elem_with_string_dict!(
-                            Int8Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        DataType::Int16 => {
-                            equal_rows_elem_with_string_dict!(
-                            Int16Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        DataType::Int32 => {
-                            equal_rows_elem_with_string_dict!(
-                            Int32Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        DataType::Int64 => {
-                            equal_rows_elem_with_string_dict!(
-                            Int64Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        DataType::UInt8 => {
-                            equal_rows_elem_with_string_dict!(
-                            UInt8Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        DataType::UInt16 => {
-                            equal_rows_elem_with_string_dict!(
-                            UInt16Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        DataType::UInt32 => {
-                            equal_rows_elem_with_string_dict!(
-                            UInt32Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        DataType::UInt64 => {
-                            equal_rows_elem_with_string_dict!(
-                            UInt64Type,
-                            l,
-                            r,
-                            left,
-                            right,
-                            null_equals_null
-                        )
-                        }
-                        _ => {
-                            // should not happen
-                            err = Some(internal_err!(
-                                "Unsupported data type in hasher"
-                            ));
-                            false
-                        }
-                    }
-                }
-            other => {
-                // This is internal because we should have caught this before.
-                err = Some(internal_err!(
-                    "Unsupported data type in hasher: {other}"
-                ));
-                false
-            }
-        });
-
-    err.unwrap_or(Ok(res))
-}
-
 // version of eq_dyn supporting equality on null arrays
 fn eq_dyn_null(
     left: &dyn Array,
@@ -1297,6 +972,7 @@ impl HashJoinStream {
                         &mut hashes_buffer,
                         self.filter.as_ref(),
                         JoinSide::Left,
+                        None,
                     );
 
                     let result = match left_right_indices {
@@ -2759,6 +2435,7 @@ mod tests {
             &mut vec![0; right.num_rows()],
             None,
             JoinSide::Left,
+            None,
         )?;
 
         let mut left_ids = UInt64Builder::with_capacity(0);
diff --git a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs 
b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
index 37790e6bb8..b80413b53d 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join_utils.rs
@@ -18,30 +18,29 @@
 //! This file contains common subroutines for regular and symmetric hash join
 //! related functionality, used both in join calculations and optimization 
rules.
 
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
+use std::fmt::{Debug, Formatter};
+use std::ops::IndexMut;
 use std::sync::Arc;
 use std::{fmt, usize};
 
-use arrow::datatypes::{ArrowNativeType, SchemaRef};
+use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
+use crate::physical_plan::ExecutionPlan;
 
 use arrow::compute::concat_batches;
+use arrow::datatypes::{ArrowNativeType, SchemaRef};
 use arrow_array::builder::BooleanBufferBuilder;
 use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, 
RecordBatch};
 use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::{DataFusionError, ScalarValue};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, 
IntervalBound};
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+
 use hashbrown::raw::RawTable;
 use hashbrown::HashSet;
 use parking_lot::Mutex;
-use smallvec::SmallVec;
-use std::fmt::{Debug, Formatter};
-
-use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
-use crate::physical_plan::ExecutionPlan;
-use datafusion_common::Result;
 
 // Maps a `u64` hash value based on the build side ["on" values] to a list of 
indices with this key's value.
 // By allocating a `HashMap` with capacity for *at least* the number of rows 
for entries at the build side,
@@ -94,20 +93,15 @@ use datafusion_common::Result;
 // ---------------------
 // | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices 
values 4,3,1)
 // ---------------------
-
 // TODO: speed up collision checks
 // https://github.com/apache/arrow-datafusion/issues/50
 pub struct JoinHashMap {
-    // Stores hash value to first index
+    // Stores hash value to last row index
     pub map: RawTable<(u64, u64)>,
     // Stores indices in chained list data structure
     pub next: Vec<u64>,
 }
 
-/// SymmetricJoinHashMap is similar to JoinHashMap, except that it stores the 
indices inline, allowing it to mutate
-/// and shrink the indices.
-pub struct SymmetricJoinHashMap(pub RawTable<(u64, SmallVec<[u64; 1]>)>);
-
 impl JoinHashMap {
     pub(crate) fn with_capacity(capacity: usize) -> Self {
         JoinHashMap {
@@ -117,37 +111,201 @@ impl JoinHashMap {
     }
 }
 
-impl SymmetricJoinHashMap {
+/// Trait defining methods that must be implemented by a hash map type to be 
used for joins.
+pub trait JoinHashMapType {
+    /// The type of list used to store the hash values.
+    type NextType: IndexMut<usize, Output = u64>;
+    /// Extend with zero
+    fn extend_zero(&mut self, len: usize);
+    /// Returns mutable references to the hash map and the next.
+    fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType);
+    /// Returns a reference to the hash map.
+    fn get_map(&self) -> &RawTable<(u64, u64)>;
+    /// Returns a reference to the next.
+    fn get_list(&self) -> &Self::NextType;
+}
+
+/// Implementation of `JoinHashMapType` for `JoinHashMap`.
+impl JoinHashMapType for JoinHashMap {
+    type NextType = Vec<u64>;
+
+    // Void implementation
+    fn extend_zero(&mut self, _: usize) {}
+
+    /// Get mutable references to the hash map and the next.
+    fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
+        (&mut self.map, &mut self.next)
+    }
+
+    /// Get a reference to the hash map.
+    fn get_map(&self) -> &RawTable<(u64, u64)> {
+        &self.map
+    }
+
+    /// Get a reference to the next.
+    fn get_list(&self) -> &Self::NextType {
+        &self.next
+    }
+}
+
+/// Implementation of `JoinHashMapType` for `PruningJoinHashMap`.
+impl JoinHashMapType for PruningJoinHashMap {
+    type NextType = VecDeque<u64>;
+
+    // Extend with zero
+    fn extend_zero(&mut self, len: usize) {
+        self.next.resize(self.next.len() + len, 0)
+    }
+
+    /// Get mutable references to the hash map and the next.
+    fn get_mut(&mut self) -> (&mut RawTable<(u64, u64)>, &mut Self::NextType) {
+        (&mut self.map, &mut self.next)
+    }
+
+    /// Get a reference to the hash map.
+    fn get_map(&self) -> &RawTable<(u64, u64)> {
+        &self.map
+    }
+
+    /// Get a reference to the next.
+    fn get_list(&self) -> &Self::NextType {
+        &self.next
+    }
+}
+
+impl fmt::Debug for JoinHashMap {
+    fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
+        Ok(())
+    }
+}
+
+/// The `PruningJoinHashMap` is similar to a regular `JoinHashMap`, but with
+/// the capability of pruning elements in an efficient manner. This structure
+/// is particularly useful for cases where it's necessary to remove elements
+/// from the map based on their buffer order.
+///
+/// # Example
+///
+/// ``` text
+/// Let's continue the example of `JoinHashMap` and then show how 
`PruningJoinHashMap` would
+/// handle the pruning scenario.
+///
+/// Insert the pair (1,4) into the `PruningJoinHashMap`:
+/// map:
+/// ---------
+/// | 1 | 5 |
+/// | 2 | 3 |
+/// ---------
+/// list:
+/// ---------------------
+/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices 
values 4,3,1)
+/// ---------------------
+///
+/// Now, let's prune 3 rows from `PruningJoinHashMap`:
+/// map:
+/// ---------
+/// | 1 | 5 |
+/// ---------
+/// list:
+/// ---------
+/// | 2 | 4 | <--- hash value 1 maps to 2 (5 - 3), 1 (4 - 3), NA (2 - 3) 
(which means indices values 1,0)
+/// ---------
+///
+/// After pruning, the | 2 | 3 | entry is deleted from `PruningJoinHashMap` 
since
+/// there are no values left for this key.
+/// ```
+pub struct PruningJoinHashMap {
+    /// Stores hash value to last row index
+    pub map: RawTable<(u64, u64)>,
+    /// Stores indices in chained list data structure
+    pub next: VecDeque<u64>,
+}
+
+impl PruningJoinHashMap {
+    /// Constructs a new `PruningJoinHashMap` with the given capacity.
+    /// Both the map and the list are pre-allocated with the provided capacity.
+    ///
+    /// # Arguments
+    /// * `capacity`: The initial capacity of the hash map.
+    ///
+    /// # Returns
+    /// A new instance of `PruningJoinHashMap`.
     pub(crate) fn with_capacity(capacity: usize) -> Self {
-        Self(RawTable::with_capacity(capacity))
+        PruningJoinHashMap {
+            map: RawTable::with_capacity(capacity),
+            next: VecDeque::with_capacity(capacity),
+        }
     }
 
-    /// In this implementation, the scale_factor variable determines how 
conservative the shrinking strategy is.
-    /// The value of scale_factor is set to 4, which means the capacity will 
be reduced by 25%
-    /// when necessary. You can adjust the scale_factor value to achieve the 
desired
-    /// balance between memory usage and performance.
-    //
-    // If you increase the scale_factor, the capacity will shrink less 
aggressively,
-    // leading to potentially higher memory usage but fewer resizes.
-    // Conversely, if you decrease the scale_factor, the capacity will shrink 
more aggressively,
-    // potentially leading to lower memory usage but more frequent resizing.
+    /// Shrinks the capacity of the hash map, if necessary, based on the
+    /// provided scale factor.
+    ///
+    /// # Arguments
+    /// * `scale_factor`: The scale factor that determines how conservative the
+    ///   shrinking strategy is. The capacity will be reduced by 
1/`scale_factor`
+    ///   when necessary.
+    ///
+    /// # Note
+    /// Increasing the scale factor results in less aggressive capacity 
shrinking,
+    /// leading to potentially higher memory usage but fewer resizes. 
Conversely,
+    /// decreasing the scale factor results in more aggressive capacity 
shrinking,
+    /// potentially leading to lower memory usage but more frequent resizing.
     pub(crate) fn shrink_if_necessary(&mut self, scale_factor: usize) {
-        let capacity = self.0.capacity();
-        let len = self.0.len();
+        let capacity = self.map.capacity();
 
-        if capacity > scale_factor * len {
+        if capacity > scale_factor * self.map.len() {
             let new_capacity = (capacity * (scale_factor - 1)) / scale_factor;
-            self.0.shrink_to(new_capacity, |(hash, _)| *hash)
+            // Resize the map with the new capacity.
+            self.map.shrink_to(new_capacity, |(hash, _)| *hash)
         }
     }
 
+    /// Calculates the size of the `PruningJoinHashMap` in bytes.
+    ///
+    /// # Returns
+    /// The size of the hash map in bytes.
     pub(crate) fn size(&self) -> usize {
-        self.0.allocation_info().1.size()
+        self.map.allocation_info().1.size()
+            + self.next.capacity() * std::mem::size_of::<u64>()
     }
-}
 
-impl fmt::Debug for JoinHashMap {
-    fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result {
+    /// Removes hash values from the map and the list based on the given 
pruning
+    /// length and deleting offset.
+    ///
+    /// # Arguments
+    /// * `prune_length`: The number of elements to remove from the list.
+    /// * `deleting_offset`: The offset used to determine which hash values to 
remove from the map.
+    ///
+    /// # Returns
+    /// A `Result` indicating whether the operation was successful.
+    pub(crate) fn prune_hash_values(
+        &mut self,
+        prune_length: usize,
+        deleting_offset: u64,
+        shrink_factor: usize,
+    ) -> Result<()> {
+        // Remove elements from the list based on the pruning length.
+        self.next.drain(0..prune_length);
+
+        // Calculate the keys that should be removed from the map.
+        let removable_keys = unsafe {
+            self.map
+                .iter()
+                .map(|bucket| bucket.as_ref())
+                .filter_map(|(hash, tail_index)| {
+                    (*tail_index < prune_length as u64 + 
deleting_offset).then_some(*hash)
+                })
+                .collect::<Vec<_>>()
+        };
+
+        // Remove the keys from the map.
+        removable_keys.into_iter().for_each(|hash_value| {
+            self.map
+                .remove_entry(hash_value, |(hash, _)| hash_value == *hash);
+        });
+
+        // Shrink the map if necessary.
+        self.shrink_if_necessary(shrink_factor);
         Ok(())
     }
 }
@@ -682,7 +840,6 @@ pub mod tests {
     use datafusion_common::ScalarValue;
     use datafusion_expr::Operator;
     use datafusion_physical_expr::expressions::{binary, cast, col, lit};
-    use smallvec::smallvec;
     use std::sync::Arc;
 
     /// Filter expr for a + b > c + 10 AND a + b < c + 100
@@ -1020,40 +1177,40 @@ pub mod tests {
     #[test]
     fn test_shrink_if_necessary() {
         let scale_factor = 4;
-        let mut join_hash_map = SymmetricJoinHashMap::with_capacity(100);
+        let mut join_hash_map = PruningJoinHashMap::with_capacity(100);
         let data_size = 2000;
         let deleted_part = 3 * data_size / 4;
         // Add elements to the JoinHashMap
         for hash_value in 0..data_size {
-            join_hash_map.0.insert(
+            join_hash_map.map.insert(
                 hash_value,
-                (hash_value, smallvec![hash_value]),
+                (hash_value, hash_value),
                 |(hash, _)| *hash,
             );
         }
 
-        assert_eq!(join_hash_map.0.len(), data_size as usize);
-        assert!(join_hash_map.0.capacity() >= data_size as usize);
+        assert_eq!(join_hash_map.map.len(), data_size as usize);
+        assert!(join_hash_map.map.capacity() >= data_size as usize);
 
         // Remove some elements from the JoinHashMap
         for hash_value in 0..deleted_part {
             join_hash_map
-                .0
+                .map
                 .remove_entry(hash_value, |(hash, _)| hash_value == *hash);
         }
 
-        assert_eq!(join_hash_map.0.len(), (data_size - deleted_part) as usize);
+        assert_eq!(join_hash_map.map.len(), (data_size - deleted_part) as 
usize);
 
         // Old capacity
-        let old_capacity = join_hash_map.0.capacity();
+        let old_capacity = join_hash_map.map.capacity();
 
         // Test shrink_if_necessary
         join_hash_map.shrink_if_necessary(scale_factor);
 
         // The capacity should be reduced by the scale factor
         let new_expected_capacity =
-            join_hash_map.0.capacity() * (scale_factor - 1) / scale_factor;
-        assert!(join_hash_map.0.capacity() >= new_expected_capacity);
-        assert!(join_hash_map.0.capacity() <= old_capacity);
+            join_hash_map.map.capacity() * (scale_factor - 1) / scale_factor;
+        assert!(join_hash_map.map.capacity() >= new_expected_capacity);
+        assert!(join_hash_map.map.capacity() <= old_capacity);
     }
 }
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs 
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 3309a39b61..1c664adfbb 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -25,7 +25,6 @@
 //! This plan uses the [OneSideHashJoiner] object to facilitate join 
calculations
 //! for both its children.
 
-use std::collections::{HashMap, VecDeque};
 use std::fmt;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -33,29 +32,15 @@ use std::task::Poll;
 use std::vec;
 use std::{any::Any, usize};
 
-use ahash::RandomState;
-use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, 
PrimitiveBuilder};
-use arrow::compute::concat_batches;
-use arrow::datatypes::{Schema, SchemaRef};
-use arrow::record_batch::RecordBatch;
-use arrow_array::builder::{UInt32BufferBuilder, UInt64BufferBuilder};
-use arrow_array::{UInt32Array, UInt64Array};
-use datafusion_physical_expr::hash_utils::create_hashes;
-use datafusion_physical_expr::PhysicalExpr;
-use futures::stream::{select, BoxStream};
-use futures::{Stream, StreamExt};
-use hashbrown::HashSet;
-use parking_lot::Mutex;
-use smallvec::smallvec;
-
-use datafusion_execution::memory_pool::MemoryConsumer;
-use datafusion_physical_expr::intervals::ExprIntervalGraph;
-
 use crate::physical_plan::common::SharedMemoryReservation;
+use crate::physical_plan::joins::hash_join::{
+    build_equal_condition_join_indices, update_hash,
+};
 use crate::physical_plan::joins::hash_join_utils::{
     build_filter_expression_graph, calculate_filter_expr_intervals, 
combine_two_batches,
     convert_sort_expr_with_filter_schema, get_pruning_anti_indices,
     get_pruning_semi_indices, record_visited_indices, 
IntervalCalculatorInnerState,
+    PruningJoinHashMap,
 };
 use crate::physical_plan::joins::StreamJoinPartitionMode;
 use crate::physical_plan::DisplayAs;
@@ -74,14 +59,23 @@ use crate::physical_plan::{
     DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, 
Partitioning,
     RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
+
+use arrow::array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray, 
PrimitiveBuilder};
+use arrow::compute::concat_batches;
+use arrow::datatypes::{Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
 use datafusion_common::utils::bisect;
 use datafusion_common::{internal_err, plan_err, JoinType};
 use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::memory_pool::MemoryConsumer;
 use datafusion_execution::TaskContext;
+use datafusion_physical_expr::intervals::ExprIntervalGraph;
 
-use super::hash_join::equal_rows;
-use super::hash_join_utils::SymmetricJoinHashMap;
-use super::utils::apply_join_filter_to_indices;
+use ahash::RandomState;
+use futures::stream::{select, BoxStream};
+use futures::{Stream, StreamExt};
+use hashbrown::HashSet;
+use parking_lot::Mutex;
 
 const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;
 
@@ -620,41 +614,6 @@ impl Stream for SymmetricHashJoinStream {
     }
 }
 
-fn prune_hash_values(
-    prune_length: usize,
-    hashmap: &mut SymmetricJoinHashMap,
-    row_hash_values: &mut VecDeque<u64>,
-    offset: u64,
-) -> Result<()> {
-    // Create a (hash)-(row number set) map
-    let mut hash_value_map: HashMap<u64, HashSet<u64>> = HashMap::new();
-    for index in 0..prune_length {
-        let hash_value = row_hash_values.pop_front().unwrap();
-        if let Some(set) = hash_value_map.get_mut(&hash_value) {
-            set.insert(offset + index as u64);
-        } else {
-            let mut set = HashSet::new();
-            set.insert(offset + index as u64);
-            hash_value_map.insert(hash_value, set);
-        }
-    }
-    for (hash_value, index_set) in hash_value_map.iter() {
-        if let Some((_, separation_chain)) = hashmap
-            .0
-            .get_mut(*hash_value, |(hash, _)| hash_value == hash)
-        {
-            separation_chain.retain(|n| !index_set.contains(n));
-            if separation_chain.is_empty() {
-                hashmap
-                    .0
-                    .remove_entry(*hash_value, |(hash, _)| hash_value == hash);
-            }
-        }
-    }
-    hashmap.shrink_if_necessary(HASHMAP_SHRINK_SCALE_FACTOR);
-    Ok(())
-}
-
 /// Determine the pruning length for `buffer`.
 ///
 /// This function evaluates the build side filter expression, converts the
@@ -834,144 +793,6 @@ pub(crate) fn build_side_determined_results(
     }
 }
 
-/// Gets build and probe indices which satisfy the on condition (including
-/// the equality condition and the join filter) in the join.
-#[allow(clippy::too_many_arguments)]
-pub fn build_join_indices(
-    probe_batch: &RecordBatch,
-    build_hashmap: &SymmetricJoinHashMap,
-    build_input_buffer: &RecordBatch,
-    on_build: &[Column],
-    on_probe: &[Column],
-    filter: Option<&JoinFilter>,
-    random_state: &RandomState,
-    null_equals_null: bool,
-    hashes_buffer: &mut Vec<u64>,
-    offset: Option<usize>,
-    build_side: JoinSide,
-) -> Result<(UInt64Array, UInt32Array)> {
-    // Get the indices that satisfy the equality condition, like `left.a1 = 
right.a2`
-    let (build_indices, probe_indices) = build_equal_condition_join_indices(
-        build_hashmap,
-        build_input_buffer,
-        probe_batch,
-        on_build,
-        on_probe,
-        random_state,
-        null_equals_null,
-        hashes_buffer,
-        offset,
-    )?;
-    if let Some(filter) = filter {
-        // Filter the indices which satisfy the non-equal join condition, like 
`left.b1 = 10`
-        apply_join_filter_to_indices(
-            build_input_buffer,
-            probe_batch,
-            build_indices,
-            probe_indices,
-            filter,
-            build_side,
-        )
-    } else {
-        Ok((build_indices, probe_indices))
-    }
-}
-
-// Returns build/probe indices satisfying the equality condition.
-// On LEFT.b1 = RIGHT.b2
-// LEFT Table:
-//  a1  b1  c1
-//  1   1   10
-//  3   3   30
-//  5   5   50
-//  7   7   70
-//  9   8   90
-//  11  8   110
-// 13   10  130
-// RIGHT Table:
-//  a2   b2  c2
-//  2    2   20
-//  4    4   40
-//  6    6   60
-//  8    8   80
-// 10   10  100
-// 12   10  120
-// The result is
-// "+----+----+-----+----+----+-----+",
-// "| a1 | b1 | c1  | a2 | b2 | c2  |",
-// "+----+----+-----+----+----+-----+",
-// "| 11 | 8  | 110 | 8  | 8  | 80  |",
-// "| 13 | 10 | 130 | 10 | 10 | 100 |",
-// "| 13 | 10 | 130 | 12 | 10 | 120 |",
-// "| 9  | 8  | 90  | 8  | 8  | 80  |",
-// "+----+----+-----+----+----+-----+"
-// And the result of build and probe indices are:
-// Build indices:  5, 6, 6, 4
-// Probe indices: 3, 4, 5, 3
-#[allow(clippy::too_many_arguments)]
-pub fn build_equal_condition_join_indices(
-    build_hashmap: &SymmetricJoinHashMap,
-    build_input_buffer: &RecordBatch,
-    probe_batch: &RecordBatch,
-    build_on: &[Column],
-    probe_on: &[Column],
-    random_state: &RandomState,
-    null_equals_null: bool,
-    hashes_buffer: &mut Vec<u64>,
-    offset: Option<usize>,
-) -> Result<(UInt64Array, UInt32Array)> {
-    let keys_values = probe_on
-        .iter()
-        .map(|c| 
Ok(c.evaluate(probe_batch)?.into_array(probe_batch.num_rows())))
-        .collect::<Result<Vec<_>>>()?;
-    let build_join_values = build_on
-        .iter()
-        .map(|c| {
-            Ok(c.evaluate(build_input_buffer)?
-                .into_array(build_input_buffer.num_rows()))
-        })
-        .collect::<Result<Vec<_>>>()?;
-    hashes_buffer.clear();
-    hashes_buffer.resize(probe_batch.num_rows(), 0);
-    let hash_values = create_hashes(&keys_values, random_state, 
hashes_buffer)?;
-    // Using a buffer builder to avoid slower normal builder
-    let mut build_indices = UInt64BufferBuilder::new(0);
-    let mut probe_indices = UInt32BufferBuilder::new(0);
-    let offset_value = offset.unwrap_or(0);
-    // Visit all of the probe rows
-    for (row, hash_value) in hash_values.iter().enumerate() {
-        // Get the hash and find it in the build index
-        // For every item on the build and probe we check if it matches
-        // This possibly contains rows with hash collisions,
-        // So we have to check here whether rows are equal or not
-        if let Some((_, indices)) = build_hashmap
-            .0
-            .get(*hash_value, |(hash, _)| *hash_value == *hash)
-        {
-            for &i in indices {
-                // Check hash collisions
-                let offset_build_index = i as usize - offset_value;
-                // Check hash collisions
-                if equal_rows(
-                    offset_build_index,
-                    row,
-                    &build_join_values,
-                    &keys_values,
-                    null_equals_null,
-                )? {
-                    build_indices.append(offset_build_index as u64);
-                    probe_indices.append(row as u32);
-                }
-            }
-        }
-    }
-
-    Ok((
-        PrimitiveArray::new(build_indices.finish().into(), None),
-        PrimitiveArray::new(probe_indices.finish().into(), None),
-    ))
-}
-
 /// This method performs a join between the build side input buffer and the 
probe side batch.
 ///
 /// # Arguments
@@ -1006,18 +827,18 @@ pub(crate) fn join_with_probe_batch(
     if build_hash_joiner.input_buffer.num_rows() == 0 || 
probe_batch.num_rows() == 0 {
         return Ok(None);
     }
-    let (build_indices, probe_indices) = build_join_indices(
-        probe_batch,
+    let (build_indices, probe_indices) = build_equal_condition_join_indices(
         &build_hash_joiner.hashmap,
         &build_hash_joiner.input_buffer,
+        probe_batch,
         &build_hash_joiner.on,
         &probe_hash_joiner.on,
-        filter,
         random_state,
         null_equals_null,
         &mut build_hash_joiner.hashes_buffer,
-        Some(build_hash_joiner.deleted_offset),
+        filter,
         build_hash_joiner.build_side,
+        Some(build_hash_joiner.deleted_offset),
     )?;
     if need_to_produce_result_in_final(build_hash_joiner.build_side, 
join_type) {
         record_visited_indices(
@@ -1063,9 +884,7 @@ pub struct OneSideHashJoiner {
     /// Columns from the side
     pub(crate) on: Vec<Column>,
     /// Hashmap
-    pub(crate) hashmap: SymmetricJoinHashMap,
-    /// To optimize hash deleting in case of pruning, we hold them in memory
-    row_hash_values: VecDeque<u64>,
+    pub(crate) hashmap: PruningJoinHashMap,
     /// Reuse the hashes buffer
     pub(crate) hashes_buffer: Vec<u64>,
     /// Matched rows
@@ -1084,7 +903,6 @@ impl OneSideHashJoiner {
         size += self.input_buffer.get_array_memory_size();
         size += std::mem::size_of_val(&self.on);
         size += self.hashmap.size();
-        size += self.row_hash_values.capacity() * std::mem::size_of::<u64>();
         size += self.hashes_buffer.capacity() * std::mem::size_of::<u64>();
         size += self.visited_rows.capacity() * std::mem::size_of::<usize>();
         size += std::mem::size_of_val(&self.offset);
@@ -1096,8 +914,7 @@ impl OneSideHashJoiner {
             build_side,
             input_buffer: RecordBatch::new_empty(schema),
             on,
-            hashmap: SymmetricJoinHashMap::with_capacity(0),
-            row_hash_values: VecDeque::new(),
+            hashmap: PruningJoinHashMap::with_capacity(0),
             hashes_buffer: vec![],
             visited_rows: HashSet::new(),
             offset: 0,
@@ -1105,39 +922,6 @@ impl OneSideHashJoiner {
         }
     }
 
-    pub fn update_hash(
-        on: &[Column],
-        batch: &RecordBatch,
-        hash_map: &mut SymmetricJoinHashMap,
-        offset: usize,
-        random_state: &RandomState,
-        hashes_buffer: &mut Vec<u64>,
-    ) -> Result<()> {
-        // evaluate the keys
-        let keys_values = on
-            .iter()
-            .map(|c| Ok(c.evaluate(batch)?.into_array(batch.num_rows())))
-            .collect::<Result<Vec<_>>>()?;
-        // calculate the hash values
-        let hash_values = create_hashes(&keys_values, random_state, 
hashes_buffer)?;
-        // insert hashes to key of the hashmap
-        for (row, hash_value) in hash_values.iter().enumerate() {
-            let item = hash_map
-                .0
-                .get_mut(*hash_value, |(hash, _)| *hash_value == *hash);
-            if let Some((_, indices)) = item {
-                indices.push((row + offset) as u64);
-            } else {
-                hash_map.0.insert(
-                    *hash_value,
-                    (*hash_value, smallvec![(row + offset) as u64]),
-                    |(hash, _)| *hash,
-                );
-            }
-        }
-        Ok(())
-    }
-
     /// Updates the internal state of the [OneSideHashJoiner] with the 
incoming batch.
     ///
     /// # Arguments
@@ -1159,16 +943,15 @@ impl OneSideHashJoiner {
         self.hashes_buffer.resize(batch.num_rows(), 0);
         // Get allocation_info before adding the item
         // Update the hashmap with the join key values and hashes of the 
incoming batch:
-        Self::update_hash(
+        update_hash(
             &self.on,
             batch,
             &mut self.hashmap,
             self.offset,
             random_state,
             &mut self.hashes_buffer,
+            self.deleted_offset,
         )?;
-        // Add the hashes buffer to the hash value deque:
-        self.row_hash_values.extend(self.hashes_buffer.iter());
         Ok(())
     }
 
@@ -1228,11 +1011,10 @@ impl OneSideHashJoiner {
 
     pub(crate) fn prune_internal_state(&mut self, prune_length: usize) -> 
Result<()> {
         // Prune the hash values:
-        prune_hash_values(
+        self.hashmap.prune_hash_values(
             prune_length,
-            &mut self.hashmap,
-            &mut self.row_hash_values,
             self.deleted_offset as u64,
+            HASHMAP_SHRINK_SCALE_FACTOR,
         )?;
         // Remove pruned rows from the visited rows set:
         for row in self.deleted_offset..(self.deleted_offset + prune_length) {
@@ -1448,7 +1230,7 @@ mod tests {
     };
     use datafusion_common::ScalarValue;
 
-    const TABLE_SIZE: i32 = 100;
+    const TABLE_SIZE: i32 = 1000;
 
     pub async fn experiment(
         left: Arc<dyn ExecutionPlan>,
@@ -1630,6 +1412,61 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test(flavor = "multi_thread")]
+    async fn join_all_one_ascending_numeric_v2() -> Result<()> {
+        let join_type = JoinType::Inner;
+        let cardinality = (4, 5);
+        let case_expr = 2;
+        let task_ctx = Arc::new(TaskContext::default());
+        let (left_batch, right_batch) = build_sides_record_batches(1000, 
cardinality)?;
+        let left_schema = &left_batch.schema();
+        let right_schema = &right_batch.schema();
+        let left_sorted = vec![PhysicalSortExpr {
+            expr: col("la1", left_schema)?,
+            options: SortOptions::default(),
+        }];
+        let right_sorted = vec![PhysicalSortExpr {
+            expr: col("ra1", right_schema)?,
+            options: SortOptions::default(),
+        }];
+        let (left, right) = create_memory_table(
+            left_batch,
+            right_batch,
+            vec![left_sorted],
+            vec![right_sorted],
+            13,
+        )?;
+
+        let on = vec![(
+            Column::new_with_schema("lc1", left_schema)?,
+            Column::new_with_schema("rc1", right_schema)?,
+        )];
+
+        let intermediate_schema = Schema::new(vec![
+            Field::new("left", DataType::Int32, true),
+            Field::new("right", DataType::Int32, true),
+        ]);
+        let filter_expr = join_expr_tests_fixture_i32(
+            case_expr,
+            col("left", &intermediate_schema)?,
+            col("right", &intermediate_schema)?,
+        );
+        let column_indices = vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Right,
+            },
+        ];
+        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+
+        experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
+        Ok(())
+    }
+
     #[rstest]
     #[tokio::test(flavor = "multi_thread")]
     async fn join_without_sort_information(

Reply via email to