This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f5d23ff58 memory limited hash join (#5490)
f5d23ff58 is described below

commit f5d23ff582f1393d5ce94cf977d11e59f90fe523
Author: Eduard Karacharov <[email protected]>
AuthorDate: Thu Mar 9 15:36:49 2023 +0300

    memory limited hash join (#5490)
---
 datafusion/core/src/physical_plan/common.rs        |   6 +
 .../core/src/physical_plan/joins/cross_join.rs     |  36 +-
 .../core/src/physical_plan/joins/hash_join.rs      | 379 ++++++++++++++-------
 datafusion/core/tests/memory_limit.rs              |  10 +
 4 files changed, 289 insertions(+), 142 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs 
b/datafusion/core/src/physical_plan/common.rs
index 68f888210..2f02aaa27 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -39,8 +39,14 @@ use std::task::{Context, Poll};
 use tokio::sync::mpsc;
 use tokio::task::JoinHandle;
 
+/// [`MemoryReservation`] used across query execution streams
 pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
 
+/// [`MemoryReservation`] used at query operator level
+/// `Option` wrapper allows to initialize empty reservation in operator 
constructor,
+/// and set it to actual reservation at stream level.
+pub(crate) type OperatorMemoryReservation = 
Arc<Mutex<Option<SharedMemoryReservation>>>;
+
 /// Stream of record batches
 pub struct SizedRecordBatchStream {
     schema: SchemaRef,
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs 
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 0523a2e35..d4933b9d6 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch;
 
 use crate::execution::context::TaskContext;
 use crate::execution::memory_pool::MemoryConsumer;
-use crate::physical_plan::common::SharedMemoryReservation;
+use crate::physical_plan::common::{OperatorMemoryReservation, 
SharedMemoryReservation};
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     coalesce_batches::concat_batches, 
coalesce_partitions::CoalescePartitionsExec,
@@ -60,6 +60,8 @@ pub struct CrossJoinExec {
     schema: SchemaRef,
     /// Build-side data
     left_fut: OnceAsync<JoinLeftData>,
+    /// Memory reservation for build-side data
+    reservation: OperatorMemoryReservation,
     /// Execution plan metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -83,6 +85,7 @@ impl CrossJoinExec {
             right,
             schema,
             left_fut: Default::default(),
+            reservation: Default::default(),
             metrics: ExecutionPlanMetricsSet::default(),
         }
     }
@@ -221,17 +224,29 @@ impl ExecutionPlan for CrossJoinExec {
         let stream = self.right.execute(partition, context.clone())?;
 
         let join_metrics = BuildProbeJoinMetrics::new(partition, 
&self.metrics);
-        let reservation = Arc::new(Mutex::new(
-            MemoryConsumer::new(format!("CrossJoinStream[{partition}]"))
-                .register(context.memory_pool()),
-        ));
+
+        // Initialization of operator-level reservation
+        {
+            let mut reservation_lock = self.reservation.lock();
+            if reservation_lock.is_none() {
+                *reservation_lock = Some(Arc::new(Mutex::new(
+                    
MemoryConsumer::new("CrossJoinExec").register(context.memory_pool()),
+                )));
+            };
+        }
+
+        let reservation = self.reservation.lock().clone().ok_or_else(|| {
+            DataFusionError::Internal(
+                "Operator-level memory reservation is not 
initialized".to_string(),
+            )
+        })?;
 
         let left_fut = self.left_fut.once(|| {
             load_left_input(
                 self.left.clone(),
                 context,
                 join_metrics.clone(),
-                reservation.clone(),
+                reservation,
             )
         });
 
@@ -242,7 +257,6 @@ impl ExecutionPlan for CrossJoinExec {
             right_batch: Arc::new(parking_lot::Mutex::new(None)),
             left_index: 0,
             join_metrics,
-            reservation,
         }))
     }
 
@@ -346,8 +360,6 @@ struct CrossJoinStream {
     right_batch: Arc<parking_lot::Mutex<Option<RecordBatch>>>,
     /// join execution metrics
     join_metrics: BuildProbeJoinMetrics,
-    /// memory reservation
-    reservation: SharedMemoryReservation,
 }
 
 impl RecordBatchStream for CrossJoinStream {
@@ -452,10 +464,7 @@ impl CrossJoinStream {
 
                     Some(result)
                 }
-                other => {
-                    self.reservation.lock().free();
-                    other
-                }
+                other => other,
             })
     }
 }
@@ -683,6 +692,7 @@ mod tests {
             err.to_string(),
             "External error: Resources exhausted: Failed to allocate 
additional"
         );
+        assert_contains!(err.to_string(), "CrossJoinExec");
 
         Ok(())
     }
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 021f41ef1..03ac5050f 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -32,11 +32,11 @@ use arrow::{
         Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, 
UInt64Type,
         UInt8Type,
     },
+    util::bit_util,
 };
 use smallvec::{smallvec, SmallVec};
 use std::sync::Arc;
-use std::{any::Any, usize};
-use std::{time::Instant, vec};
+use std::{any::Any, usize, vec};
 
 use futures::{ready, Stream, StreamExt, TryStreamExt};
 
@@ -58,15 +58,17 @@ use hashbrown::raw::RawTable;
 use crate::physical_plan::{
     coalesce_batches::concat_batches,
     coalesce_partitions::CoalescePartitionsExec,
+    common::{OperatorMemoryReservation, SharedMemoryReservation},
     expressions::Column,
     expressions::PhysicalSortExpr,
     hash_utils::create_hashes,
     joins::utils::{
         adjust_right_output_partitioning, build_join_schema, 
check_join_is_valid,
         combine_join_equivalence_properties, estimate_join_statistics,
-        partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn,
+        partitioned_join_output_partitioning, BuildProbeJoinMetrics, 
ColumnIndex,
+        JoinFilter, JoinOn,
     },
-    metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+    metrics::{ExecutionPlanMetricsSet, MetricsSet},
     DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, 
Partitioning,
     PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
@@ -76,7 +78,7 @@ use crate::logical_expr::JoinType;
 
 use crate::arrow::array::BooleanBufferBuilder;
 use crate::arrow::datatypes::TimeUnit;
-use crate::execution::context::TaskContext;
+use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer};
 
 use super::{
     utils::{OnceAsync, OnceFut},
@@ -86,7 +88,7 @@ use crate::physical_plan::joins::utils::{
     adjust_indices_by_join_type, apply_join_filter_to_indices, 
build_batch_from_indices,
     get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
 };
-use log::debug;
+use parking_lot::Mutex;
 use std::fmt;
 use std::task::Poll;
 
@@ -134,6 +136,8 @@ pub struct HashJoinExec {
     schema: SchemaRef,
     /// Build-side data
     left_fut: OnceAsync<JoinLeftData>,
+    /// Operator-level memory reservation for left data
+    reservation: OperatorMemoryReservation,
     /// Shares the `RandomState` for the hashing algorithm
     random_state: RandomState,
     /// Partitioning mode to use
@@ -146,50 +150,6 @@ pub struct HashJoinExec {
     pub(crate) null_equals_null: bool,
 }
 
-/// Metrics for HashJoinExec
-#[derive(Debug)]
-struct HashJoinMetrics {
-    /// Total time for joining probe-side batches to the build-side batches
-    probe_time: metrics::Time,
-    /// Total time for building hashmap
-    build_time: metrics::Time,
-    /// Number of batches consumed by this operator
-    input_batches: metrics::Count,
-    /// Number of rows consumed by this operator
-    input_rows: metrics::Count,
-    /// Number of batches produced by this operator
-    output_batches: metrics::Count,
-    /// Number of rows produced by this operator
-    output_rows: metrics::Count,
-}
-
-impl HashJoinMetrics {
-    pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
-        let probe_time = MetricBuilder::new(metrics).subset_time("probe_time", 
partition);
-
-        let build_time = MetricBuilder::new(metrics).subset_time("build_time", 
partition);
-
-        let input_batches =
-            MetricBuilder::new(metrics).counter("input_batches", partition);
-
-        let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
-
-        let output_batches =
-            MetricBuilder::new(metrics).counter("output_batches", partition);
-
-        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
-
-        Self {
-            probe_time,
-            build_time,
-            input_batches,
-            input_rows,
-            output_batches,
-            output_rows,
-        }
-    }
-}
-
 impl HashJoinExec {
     /// Tries to create a new [HashJoinExec].
     /// # Error
@@ -226,6 +186,7 @@ impl HashJoinExec {
             join_type: *join_type,
             schema: Arc::new(schema),
             left_fut: Default::default(),
+            reservation: Default::default(),
             random_state,
             mode: partition_mode,
             metrics: ExecutionPlanMetricsSet::new(),
@@ -414,21 +375,57 @@ impl ExecutionPlan for HashJoinExec {
         let on_left = self.on.iter().map(|on| 
on.0.clone()).collect::<Vec<_>>();
         let on_right = self.on.iter().map(|on| 
on.1.clone()).collect::<Vec<_>>();
 
+        let join_metrics = BuildProbeJoinMetrics::new(partition, 
&self.metrics);
+
+        // Initialization of operator-level reservation
+        {
+            let mut operator_reservation_lock = self.reservation.lock();
+            if operator_reservation_lock.is_none() {
+                *operator_reservation_lock = Some(Arc::new(Mutex::new(
+                    
MemoryConsumer::new("HashJoinExec").register(context.memory_pool()),
+                )));
+            };
+        }
+
+        let operator_reservation = 
self.reservation.lock().clone().ok_or_else(|| {
+            DataFusionError::Internal(
+                "Operator-level memory reservation is not 
initialized".to_string(),
+            )
+        })?;
+
+        // Inititalization of stream-level reservation
+        let reservation = Arc::new(Mutex::new(
+            MemoryConsumer::new(format!("HashJoinStream[{partition}]"))
+                .register(context.memory_pool()),
+        ));
+
+        // Memory reservation for left-side data depends on PartitionMode:
+        // - operator-level for `CollectLeft` mode
+        // - stream-level for partitioned mode
+        //
+        // This approach allows to avoid cases when left data could potentially
+        // outlive its memory reservation and rely on `MemoryReservation` 
destructors
+        // for releasing memory in pool.
         let left_fut = match self.mode {
             PartitionMode::CollectLeft => self.left_fut.once(|| {
                 collect_left_input(
+                    None,
                     self.random_state.clone(),
                     self.left.clone(),
                     on_left.clone(),
                     context.clone(),
+                    join_metrics.clone(),
+                    operator_reservation.clone(),
                 )
             }),
-            PartitionMode::Partitioned => OnceFut::new(partitioned_left_input(
-                partition,
+            PartitionMode::Partitioned => OnceFut::new(collect_left_input(
+                Some(partition),
                 self.random_state.clone(),
                 self.left.clone(),
                 on_left.clone(),
                 context.clone(),
+                join_metrics.clone(),
+                reservation.clone(),
             )),
             PartitionMode::Auto => {
                 return Err(DataFusionError::Plan(format!(
@@ -453,9 +450,10 @@ impl ExecutionPlan for HashJoinExec {
             right: right_stream,
             column_indices: self.column_indices.clone(),
             random_state: self.random_state.clone(),
-            join_metrics: HashJoinMetrics::new(partition, &self.metrics),
+            join_metrics,
             null_equals_null: self.null_equals_null,
             is_exhausted: false,
+            reservation,
         }))
     }
 
@@ -493,90 +491,73 @@ impl ExecutionPlan for HashJoinExec {
 }
 
 async fn collect_left_input(
+    partition: Option<usize>,
     random_state: RandomState,
     left: Arc<dyn ExecutionPlan>,
     on_left: Vec<Column>,
     context: Arc<TaskContext>,
+    metrics: BuildProbeJoinMetrics,
+    reservation: SharedMemoryReservation,
 ) -> Result<JoinLeftData> {
     let schema = left.schema();
-    let start = Instant::now();
-    // merge all left parts into a single stream
-    let merge = {
-        if left.output_partitioning().partition_count() != 1 {
-            Arc::new(CoalescePartitionsExec::new(left))
-        } else {
-            left
-        }
-    };
-    let stream = merge.execute(0, context)?;
-
-    // This operation performs 2 steps at once:
-    // 1. creates a [JoinHashMap] of all batches from the stream
-    // 2. stores the batches in a vector.
-    let initial = (0, Vec::new());
-    let (num_rows, batches) = stream
-        .try_fold(initial, |mut acc, batch| async {
-            acc.0 += batch.num_rows();
-            acc.1.push(batch);
-            Ok(acc)
-        })
-        .await?;
 
-    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
-    let mut hashes_buffer = Vec::new();
-    let mut offset = 0;
-    for batch in batches.iter() {
-        hashes_buffer.clear();
-        hashes_buffer.resize(batch.num_rows(), 0);
-        update_hash(
-            &on_left,
-            batch,
-            &mut hashmap,
-            offset,
-            &random_state,
-            &mut hashes_buffer,
-        )?;
-        offset += batch.num_rows();
-    }
-    // Merge all batches into a single batch, so we
-    // can directly index into the arrays
-    let single_batch = concat_batches(&schema, &batches, num_rows)?;
-
-    debug!(
-        "Built build-side of hash join containing {} rows in {} ms",
-        num_rows,
-        start.elapsed().as_millis()
-    );
-
-    Ok((hashmap, single_batch))
-}
-
-async fn partitioned_left_input(
-    partition: usize,
-    random_state: RandomState,
-    left: Arc<dyn ExecutionPlan>,
-    on_left: Vec<Column>,
-    context: Arc<TaskContext>,
-) -> Result<JoinLeftData> {
-    let schema = left.schema();
+    let (left_input, left_input_partition) = if let Some(partition) = 
partition {
+        (left, partition)
+    } else {
+        let merge = {
+            if left.output_partitioning().partition_count() != 1 {
+                Arc::new(CoalescePartitionsExec::new(left))
+            } else {
+                left
+            }
+        };
 
-    let start = Instant::now();
+        (merge, 0)
+    };
 
-    // Load 1 partition of left side in memory
-    let stream = left.execute(partition, context.clone())?;
+    // Depending on partition argument load single partition or whole left 
side in memory
+    let stream = left_input.execute(left_input_partition, context.clone())?;
 
     // This operation performs 2 steps at once:
     // 1. creates a [JoinHashMap] of all batches from the stream
     // 2. stores the batches in a vector.
-    let initial = (0, Vec::new());
-    let (num_rows, batches) = stream
+    let initial = (Vec::new(), 0, metrics, reservation);
+    let (batches, num_rows, metrics, reservation) = stream
         .try_fold(initial, |mut acc, batch| async {
-            acc.0 += batch.num_rows();
-            acc.1.push(batch);
+            let batch_size = batch.get_array_memory_size();
+            // Reserve memory for incoming batch
+            acc.3.lock().try_grow(batch_size)?;
+            // Update metrics
+            acc.2.build_mem_used.add(batch_size);
+            acc.2.build_input_batches.add(1);
+            acc.2.build_input_rows.add(batch.num_rows());
+            // Update rowcount
+            acc.1 += batch.num_rows();
+            // Push batch to output
+            acc.0.push(batch);
             Ok(acc)
         })
         .await?;
 
+    // Estimation of memory size, required for hashtable, prior to allocation.
+    // Final result can be verified using `RawTable.allocation_info()`
+    //
+    // For majority of cases hashbrown overestimates buckets qty to keep ~1/8 
of them empty.
+    // This formula leads to overallocation for small tables (< 8 elements) 
but fine overall.
+    let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| {
+        DataFusionError::Execution(
+            "usize overflow while estimating number of hasmap 
buckets".to_string(),
+        )
+    })? / 7)
+        .next_power_of_two();
+    // 32 bytes per `(u64, SmallVec<[u64; 1]>)`
+    // + 1 byte for each bucket
+    // + 16 bytes fixed
+    let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 
16;
+
+    reservation.lock().try_grow(estimated_hastable_size)?;
+    metrics.build_mem_used.add(estimated_hastable_size);
+
     let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
     let mut hashes_buffer = Vec::new();
     let mut offset = 0;
@@ -597,13 +578,6 @@ async fn partitioned_left_input(
     // can directly index into the arrays
     let single_batch = concat_batches(&schema, &batches, num_rows)?;
 
-    debug!(
-        "Built build-side {} of hash join containing {} rows in {} ms",
-        partition,
-        num_rows,
-        start.elapsed().as_millis()
-    );
-
     Ok((hashmap, single_batch))
 }
 
@@ -667,11 +641,13 @@ struct HashJoinStream {
     /// There is nothing to process anymore and left side is processed in case 
of left join
     is_exhausted: bool,
     /// Metrics
-    join_metrics: HashJoinMetrics,
+    join_metrics: BuildProbeJoinMetrics,
     /// Information of index and left / right placement of columns
     column_indices: Vec<ColumnIndex>,
     /// If null_equals_null is true, null == null else null != null
     null_equals_null: bool,
+    /// Memory reservation
+    reservation: SharedMemoryReservation,
 }
 
 impl RecordBatchStream for HashJoinStream {
@@ -1173,6 +1149,18 @@ impl HashJoinStream {
         };
         build_timer.done();
 
+        // Reserving memory for visited_left_side bitmap in case it hasn't 
been initialied yet
+        // and join_type requires to store it
+        if self.visited_left_side.is_none()
+            && need_produce_result_in_final(self.join_type)
+        {
+            // TODO: Replace `ceil` wrapper with stable `div_cell` after
+            // https://github.com/rust-lang/rust/issues/88581
+            let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 
8);
+            self.reservation.lock().try_grow(visited_bitmap_size)?;
+            self.join_metrics.build_mem_used.add(visited_bitmap_size);
+        }
+
         let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
             let num_rows = left_data.1.num_rows();
             if need_produce_result_in_final(self.join_type) {
@@ -1196,7 +1184,7 @@ impl HashJoinStream {
                 Some(Ok(batch)) => {
                     self.join_metrics.input_batches.add(1);
                     self.join_metrics.input_rows.add(batch.num_rows());
-                    let timer = self.join_metrics.probe_time.timer();
+                    let timer = self.join_metrics.join_time.timer();
 
                     // get the matched two indices for the on condition
                     let left_right_indices = build_join_indices(
@@ -1252,7 +1240,7 @@ impl HashJoinStream {
                     result
                 }
                 None => {
-                    let timer = self.join_metrics.probe_time.timer();
+                    let timer = self.join_metrics.join_time.timer();
                     if need_produce_result_in_final(self.join_type) && 
!self.is_exhausted
                     {
                         // use the global left bitmap to produce the left 
indices and right indices
@@ -1309,10 +1297,13 @@ mod tests {
     use std::sync::Arc;
 
     use super::*;
+    use crate::execution::context::SessionConfig;
     use crate::physical_expr::expressions::BinaryExpr;
     use crate::prelude::SessionContext;
     use crate::{
         assert_batches_sorted_eq,
+        common::assert_contains,
+        execution::runtime_env::{RuntimeConfig, RuntimeEnv},
         physical_plan::{
             common,
             expressions::Column,
@@ -3040,4 +3031,134 @@ mod tests {
             );
         }
     }
+
+    #[tokio::test]
+    async fn single_partition_join_overallocation() -> Result<()> {
+        let left = build_table(
+            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+        );
+        let right = build_table(
+            ("a2", &vec![10, 11]),
+            ("b2", &vec![12, 13]),
+            ("c2", &vec![14, 15]),
+        );
+        let on = vec![(
+            Column::new_with_schema("a1", &left.schema()).unwrap(),
+            Column::new_with_schema("b2", &right.schema()).unwrap(),
+        )];
+
+        let join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::LeftAnti,
+            JoinType::RightSemi,
+            JoinType::RightAnti,
+        ];
+
+        for join_type in join_types {
+            let runtime_config = RuntimeConfig::new().with_memory_limit(100, 
1.0);
+            let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+            let session_ctx =
+                SessionContext::with_config_rt(SessionConfig::default(), 
runtime);
+            let task_ctx = session_ctx.task_ctx();
+
+            let join = join(left.clone(), right.clone(), on.clone(), 
&join_type, false)?;
+
+            let stream = join.execute(0, task_ctx)?;
+            let err = common::collect(stream).await.unwrap_err();
+
+            assert_contains!(
+                err.to_string(),
+                "External error: Resources exhausted: Failed to allocate 
additional"
+            );
+
+            // Asserting that operator-level reservation attempting to 
overallocate
+            assert_contains!(err.to_string(), "HashJoinExec");
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn partitioned_join_overallocation() -> Result<()> {
+        // Prepare partitioned inputs for HashJoinExec
+        // No need to adjust partitioning, as execution should fail with 
`Resources exhausted` error
+        let left_batch = build_table_i32(
+            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+        );
+        let left = Arc::new(
+            MemoryExec::try_new(
+                &[vec![left_batch.clone()], vec![left_batch.clone()]],
+                left_batch.schema(),
+                None,
+            )
+            .unwrap(),
+        );
+        let right_batch = build_table_i32(
+            ("a2", &vec![10, 11]),
+            ("b2", &vec![12, 13]),
+            ("c2", &vec![14, 15]),
+        );
+        let right = Arc::new(
+            MemoryExec::try_new(
+                &[vec![right_batch.clone()], vec![right_batch.clone()]],
+                right_batch.schema(),
+                None,
+            )
+            .unwrap(),
+        );
+        let on = vec![(
+            Column::new_with_schema("b1", &left_batch.schema())?,
+            Column::new_with_schema("b2", &right_batch.schema())?,
+        )];
+
+        let join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::LeftAnti,
+            JoinType::RightSemi,
+            JoinType::RightAnti,
+        ];
+
+        for join_type in join_types {
+            let runtime_config = RuntimeConfig::new().with_memory_limit(100, 
1.0);
+            let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+            let session_config = SessionConfig::default().with_batch_size(50);
+            let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
+            let task_ctx = session_ctx.task_ctx();
+
+            let join = HashJoinExec::try_new(
+                left.clone(),
+                right.clone(),
+                on.clone(),
+                None,
+                &join_type,
+                PartitionMode::Partitioned,
+                &false,
+            )?;
+
+            let stream = join.execute(1, task_ctx)?;
+            let err = common::collect(stream).await.unwrap_err();
+
+            assert_contains!(
+                err.to_string(),
+                "External error: Resources exhausted: Failed to allocate 
additional"
+            );
+
+            // Asserting that stream-level reservation attempting to 
overallocate
+            assert_contains!(err.to_string(), "HashJoinStream[1]");
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/tests/memory_limit.rs 
b/datafusion/core/tests/memory_limit.rs
index 75e26485d..392e54941 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -74,6 +74,16 @@ async fn group_by_hash() {
     .await
 }
 
+#[tokio::test]
+async fn join_by_key() {
+    run_limit_test(
+        "select t1.* from t t1 JOIN t t2 ON t1.service = t2.service",
+        "Resources exhausted: Failed to allocate additional",
+        1_000,
+    )
+    .await
+}
+
 #[tokio::test]
 async fn cross_join() {
     run_limit_test(

Reply via email to