This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f5d23ff58 memory limited hash join (#5490)
f5d23ff58 is described below
commit f5d23ff582f1393d5ce94cf977d11e59f90fe523
Author: Eduard Karacharov <[email protected]>
AuthorDate: Thu Mar 9 15:36:49 2023 +0300
memory limited hash join (#5490)
---
datafusion/core/src/physical_plan/common.rs | 6 +
.../core/src/physical_plan/joins/cross_join.rs | 36 +-
.../core/src/physical_plan/joins/hash_join.rs | 379 ++++++++++++++-------
datafusion/core/tests/memory_limit.rs | 10 +
4 files changed, 289 insertions(+), 142 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs
b/datafusion/core/src/physical_plan/common.rs
index 68f888210..2f02aaa27 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -39,8 +39,14 @@ use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
+/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
+/// [`MemoryReservation`] used at query operator level
+/// `Option` wrapper allows to initialize empty reservation in operator
constructor,
+/// and set it to actual reservation at stream level.
+pub(crate) type OperatorMemoryReservation =
Arc<Mutex<Option<SharedMemoryReservation>>>;
+
/// Stream of record batches
pub struct SizedRecordBatchStream {
schema: SchemaRef,
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 0523a2e35..d4933b9d6 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch;
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryConsumer;
-use crate::physical_plan::common::SharedMemoryReservation;
+use crate::physical_plan::common::{OperatorMemoryReservation,
SharedMemoryReservation};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
@@ -60,6 +60,8 @@ pub struct CrossJoinExec {
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
+ /// Memory reservation for build-side data
+ reservation: OperatorMemoryReservation,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -83,6 +85,7 @@ impl CrossJoinExec {
right,
schema,
left_fut: Default::default(),
+ reservation: Default::default(),
metrics: ExecutionPlanMetricsSet::default(),
}
}
@@ -221,17 +224,29 @@ impl ExecutionPlan for CrossJoinExec {
let stream = self.right.execute(partition, context.clone())?;
let join_metrics = BuildProbeJoinMetrics::new(partition,
&self.metrics);
- let reservation = Arc::new(Mutex::new(
- MemoryConsumer::new(format!("CrossJoinStream[{partition}]"))
- .register(context.memory_pool()),
- ));
+
+ // Initialization of operator-level reservation
+ {
+ let mut reservation_lock = self.reservation.lock();
+ if reservation_lock.is_none() {
+ *reservation_lock = Some(Arc::new(Mutex::new(
+
MemoryConsumer::new("CrossJoinExec").register(context.memory_pool()),
+ )));
+ };
+ }
+
+ let reservation = self.reservation.lock().clone().ok_or_else(|| {
+ DataFusionError::Internal(
+ "Operator-level memory reservation is not
initialized".to_string(),
+ )
+ })?;
let left_fut = self.left_fut.once(|| {
load_left_input(
self.left.clone(),
context,
join_metrics.clone(),
- reservation.clone(),
+ reservation,
)
});
@@ -242,7 +257,6 @@ impl ExecutionPlan for CrossJoinExec {
right_batch: Arc::new(parking_lot::Mutex::new(None)),
left_index: 0,
join_metrics,
- reservation,
}))
}
@@ -346,8 +360,6 @@ struct CrossJoinStream {
right_batch: Arc<parking_lot::Mutex<Option<RecordBatch>>>,
/// join execution metrics
join_metrics: BuildProbeJoinMetrics,
- /// memory reservation
- reservation: SharedMemoryReservation,
}
impl RecordBatchStream for CrossJoinStream {
@@ -452,10 +464,7 @@ impl CrossJoinStream {
Some(result)
}
- other => {
- self.reservation.lock().free();
- other
- }
+ other => other,
})
}
}
@@ -683,6 +692,7 @@ mod tests {
err.to_string(),
"External error: Resources exhausted: Failed to allocate
additional"
);
+ assert_contains!(err.to_string(), "CrossJoinExec");
Ok(())
}
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index 021f41ef1..03ac5050f 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -32,11 +32,11 @@ use arrow::{
Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type,
UInt64Type,
UInt8Type,
},
+ util::bit_util,
};
use smallvec::{smallvec, SmallVec};
use std::sync::Arc;
-use std::{any::Any, usize};
-use std::{time::Instant, vec};
+use std::{any::Any, usize, vec};
use futures::{ready, Stream, StreamExt, TryStreamExt};
@@ -58,15 +58,17 @@ use hashbrown::raw::RawTable;
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
+ common::{OperatorMemoryReservation, SharedMemoryReservation},
expressions::Column,
expressions::PhysicalSortExpr,
hash_utils::create_hashes,
joins::utils::{
adjust_right_output_partitioning, build_join_schema,
check_join_is_valid,
combine_join_equivalence_properties, estimate_join_statistics,
- partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn,
+ partitioned_join_output_partitioning, BuildProbeJoinMetrics,
ColumnIndex,
+ JoinFilter, JoinOn,
},
- metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+ metrics::{ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning,
PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
@@ -76,7 +78,7 @@ use crate::logical_expr::JoinType;
use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
-use crate::execution::context::TaskContext;
+use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer};
use super::{
utils::{OnceAsync, OnceFut},
@@ -86,7 +88,7 @@ use crate::physical_plan::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices,
get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
};
-use log::debug;
+use parking_lot::Mutex;
use std::fmt;
use std::task::Poll;
@@ -134,6 +136,8 @@ pub struct HashJoinExec {
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
+ /// Operator-level memory reservation for left data
+ reservation: OperatorMemoryReservation,
/// Shares the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Partitioning mode to use
@@ -146,50 +150,6 @@ pub struct HashJoinExec {
pub(crate) null_equals_null: bool,
}
-/// Metrics for HashJoinExec
-#[derive(Debug)]
-struct HashJoinMetrics {
- /// Total time for joining probe-side batches to the build-side batches
- probe_time: metrics::Time,
- /// Total time for building hashmap
- build_time: metrics::Time,
- /// Number of batches consumed by this operator
- input_batches: metrics::Count,
- /// Number of rows consumed by this operator
- input_rows: metrics::Count,
- /// Number of batches produced by this operator
- output_batches: metrics::Count,
- /// Number of rows produced by this operator
- output_rows: metrics::Count,
-}
-
-impl HashJoinMetrics {
- pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
- let probe_time = MetricBuilder::new(metrics).subset_time("probe_time",
partition);
-
- let build_time = MetricBuilder::new(metrics).subset_time("build_time",
partition);
-
- let input_batches =
- MetricBuilder::new(metrics).counter("input_batches", partition);
-
- let input_rows = MetricBuilder::new(metrics).counter("input_rows",
partition);
-
- let output_batches =
- MetricBuilder::new(metrics).counter("output_batches", partition);
-
- let output_rows = MetricBuilder::new(metrics).output_rows(partition);
-
- Self {
- probe_time,
- build_time,
- input_batches,
- input_rows,
- output_batches,
- output_rows,
- }
- }
-}
-
impl HashJoinExec {
/// Tries to create a new [HashJoinExec].
/// # Error
@@ -226,6 +186,7 @@ impl HashJoinExec {
join_type: *join_type,
schema: Arc::new(schema),
left_fut: Default::default(),
+ reservation: Default::default(),
random_state,
mode: partition_mode,
metrics: ExecutionPlanMetricsSet::new(),
@@ -414,21 +375,57 @@ impl ExecutionPlan for HashJoinExec {
let on_left = self.on.iter().map(|on|
on.0.clone()).collect::<Vec<_>>();
let on_right = self.on.iter().map(|on|
on.1.clone()).collect::<Vec<_>>();
+ let join_metrics = BuildProbeJoinMetrics::new(partition,
&self.metrics);
+
+ // Initialization of operator-level reservation
+ {
+ let mut operator_reservation_lock = self.reservation.lock();
+ if operator_reservation_lock.is_none() {
+ *operator_reservation_lock = Some(Arc::new(Mutex::new(
+
MemoryConsumer::new("HashJoinExec").register(context.memory_pool()),
+ )));
+ };
+ }
+
+ let operator_reservation =
self.reservation.lock().clone().ok_or_else(|| {
+ DataFusionError::Internal(
+ "Operator-level memory reservation is not
initialized".to_string(),
+ )
+ })?;
+
+ // Inititalization of stream-level reservation
+ let reservation = Arc::new(Mutex::new(
+ MemoryConsumer::new(format!("HashJoinStream[{partition}]"))
+ .register(context.memory_pool()),
+ ));
+
+ // Memory reservation for left-side data depends on PartitionMode:
+ // - operator-level for `CollectLeft` mode
+ // - stream-level for partitioned mode
+ //
+ // This approach allows to avoid cases when left data could potentially
+ // outlive its memory reservation and rely on `MemoryReservation`
destructors
+ // for releasing memory in pool.
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.once(|| {
collect_left_input(
+ None,
self.random_state.clone(),
self.left.clone(),
on_left.clone(),
context.clone(),
+ join_metrics.clone(),
+ operator_reservation.clone(),
)
}),
- PartitionMode::Partitioned => OnceFut::new(partitioned_left_input(
- partition,
+ PartitionMode::Partitioned => OnceFut::new(collect_left_input(
+ Some(partition),
self.random_state.clone(),
self.left.clone(),
on_left.clone(),
context.clone(),
+ join_metrics.clone(),
+ reservation.clone(),
)),
PartitionMode::Auto => {
return Err(DataFusionError::Plan(format!(
@@ -453,9 +450,10 @@ impl ExecutionPlan for HashJoinExec {
right: right_stream,
column_indices: self.column_indices.clone(),
random_state: self.random_state.clone(),
- join_metrics: HashJoinMetrics::new(partition, &self.metrics),
+ join_metrics,
null_equals_null: self.null_equals_null,
is_exhausted: false,
+ reservation,
}))
}
@@ -493,90 +491,73 @@ impl ExecutionPlan for HashJoinExec {
}
async fn collect_left_input(
+ partition: Option<usize>,
random_state: RandomState,
left: Arc<dyn ExecutionPlan>,
on_left: Vec<Column>,
context: Arc<TaskContext>,
+ metrics: BuildProbeJoinMetrics,
+ reservation: SharedMemoryReservation,
) -> Result<JoinLeftData> {
let schema = left.schema();
- let start = Instant::now();
- // merge all left parts into a single stream
- let merge = {
- if left.output_partitioning().partition_count() != 1 {
- Arc::new(CoalescePartitionsExec::new(left))
- } else {
- left
- }
- };
- let stream = merge.execute(0, context)?;
-
- // This operation performs 2 steps at once:
- // 1. creates a [JoinHashMap] of all batches from the stream
- // 2. stores the batches in a vector.
- let initial = (0, Vec::new());
- let (num_rows, batches) = stream
- .try_fold(initial, |mut acc, batch| async {
- acc.0 += batch.num_rows();
- acc.1.push(batch);
- Ok(acc)
- })
- .await?;
- let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
- let mut hashes_buffer = Vec::new();
- let mut offset = 0;
- for batch in batches.iter() {
- hashes_buffer.clear();
- hashes_buffer.resize(batch.num_rows(), 0);
- update_hash(
- &on_left,
- batch,
- &mut hashmap,
- offset,
- &random_state,
- &mut hashes_buffer,
- )?;
- offset += batch.num_rows();
- }
- // Merge all batches into a single batch, so we
- // can directly index into the arrays
- let single_batch = concat_batches(&schema, &batches, num_rows)?;
-
- debug!(
- "Built build-side of hash join containing {} rows in {} ms",
- num_rows,
- start.elapsed().as_millis()
- );
-
- Ok((hashmap, single_batch))
-}
-
-async fn partitioned_left_input(
- partition: usize,
- random_state: RandomState,
- left: Arc<dyn ExecutionPlan>,
- on_left: Vec<Column>,
- context: Arc<TaskContext>,
-) -> Result<JoinLeftData> {
- let schema = left.schema();
+ let (left_input, left_input_partition) = if let Some(partition) =
partition {
+ (left, partition)
+ } else {
+ let merge = {
+ if left.output_partitioning().partition_count() != 1 {
+ Arc::new(CoalescePartitionsExec::new(left))
+ } else {
+ left
+ }
+ };
- let start = Instant::now();
+ (merge, 0)
+ };
- // Load 1 partition of left side in memory
- let stream = left.execute(partition, context.clone())?;
+ // Depending on partition argument load single partition or whole left
side in memory
+ let stream = left_input.execute(left_input_partition, context.clone())?;
// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
- let initial = (0, Vec::new());
- let (num_rows, batches) = stream
+ let initial = (Vec::new(), 0, metrics, reservation);
+ let (batches, num_rows, metrics, reservation) = stream
.try_fold(initial, |mut acc, batch| async {
- acc.0 += batch.num_rows();
- acc.1.push(batch);
+ let batch_size = batch.get_array_memory_size();
+ // Reserve memory for incoming batch
+ acc.3.lock().try_grow(batch_size)?;
+ // Update metrics
+ acc.2.build_mem_used.add(batch_size);
+ acc.2.build_input_batches.add(1);
+ acc.2.build_input_rows.add(batch.num_rows());
+ // Update rowcount
+ acc.1 += batch.num_rows();
+ // Push batch to output
+ acc.0.push(batch);
Ok(acc)
})
.await?;
+ // Estimation of memory size, required for hashtable, prior to allocation.
+ // Final result can be verified using `RawTable.allocation_info()`
+ //
+ // For majority of cases hashbrown overestimates buckets qty to keep ~1/8
of them empty.
+ // This formula leads to overallocation for small tables (< 8 elements)
but fine overall.
+ let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| {
+ DataFusionError::Execution(
+ "usize overflow while estimating number of hasmap
buckets".to_string(),
+ )
+ })? / 7)
+ .next_power_of_two();
+ // 32 bytes per `(u64, SmallVec<[u64; 1]>)`
+ // + 1 byte for each bucket
+ // + 16 bytes fixed
+ let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets +
16;
+
+ reservation.lock().try_grow(estimated_hastable_size)?;
+ metrics.build_mem_used.add(estimated_hastable_size);
+
let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
let mut hashes_buffer = Vec::new();
let mut offset = 0;
@@ -597,13 +578,6 @@ async fn partitioned_left_input(
// can directly index into the arrays
let single_batch = concat_batches(&schema, &batches, num_rows)?;
- debug!(
- "Built build-side {} of hash join containing {} rows in {} ms",
- partition,
- num_rows,
- start.elapsed().as_millis()
- );
-
Ok((hashmap, single_batch))
}
@@ -667,11 +641,13 @@ struct HashJoinStream {
/// There is nothing to process anymore and left side is processed in case
of left join
is_exhausted: bool,
/// Metrics
- join_metrics: HashJoinMetrics,
+ join_metrics: BuildProbeJoinMetrics,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// If null_equals_null is true, null == null else null != null
null_equals_null: bool,
+ /// Memory reservation
+ reservation: SharedMemoryReservation,
}
impl RecordBatchStream for HashJoinStream {
@@ -1173,6 +1149,18 @@ impl HashJoinStream {
};
build_timer.done();
+ // Reserving memory for visited_left_side bitmap in case it hasn't
been initialied yet
+ // and join_type requires to store it
+ if self.visited_left_side.is_none()
+ && need_produce_result_in_final(self.join_type)
+ {
+ // TODO: Replace `ceil` wrapper with stable `div_cell` after
+ // https://github.com/rust-lang/rust/issues/88581
+ let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(),
8);
+ self.reservation.lock().try_grow(visited_bitmap_size)?;
+ self.join_metrics.build_mem_used.add(visited_bitmap_size);
+ }
+
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
if need_produce_result_in_final(self.join_type) {
@@ -1196,7 +1184,7 @@ impl HashJoinStream {
Some(Ok(batch)) => {
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
- let timer = self.join_metrics.probe_time.timer();
+ let timer = self.join_metrics.join_time.timer();
// get the matched two indices for the on condition
let left_right_indices = build_join_indices(
@@ -1252,7 +1240,7 @@ impl HashJoinStream {
result
}
None => {
- let timer = self.join_metrics.probe_time.timer();
+ let timer = self.join_metrics.join_time.timer();
if need_produce_result_in_final(self.join_type) &&
!self.is_exhausted
{
// use the global left bitmap to produce the left
indices and right indices
@@ -1309,10 +1297,13 @@ mod tests {
use std::sync::Arc;
use super::*;
+ use crate::execution::context::SessionConfig;
use crate::physical_expr::expressions::BinaryExpr;
use crate::prelude::SessionContext;
use crate::{
assert_batches_sorted_eq,
+ common::assert_contains,
+ execution::runtime_env::{RuntimeConfig, RuntimeEnv},
physical_plan::{
common,
expressions::Column,
@@ -3040,4 +3031,134 @@ mod tests {
);
}
}
+
+ #[tokio::test]
+ async fn single_partition_join_overallocation() -> Result<()> {
+ let left = build_table(
+ ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ );
+ let right = build_table(
+ ("a2", &vec![10, 11]),
+ ("b2", &vec![12, 13]),
+ ("c2", &vec![14, 15]),
+ );
+ let on = vec![(
+ Column::new_with_schema("a1", &left.schema()).unwrap(),
+ Column::new_with_schema("b2", &right.schema()).unwrap(),
+ )];
+
+ let join_types = vec![
+ JoinType::Inner,
+ JoinType::Left,
+ JoinType::Right,
+ JoinType::Full,
+ JoinType::LeftSemi,
+ JoinType::LeftAnti,
+ JoinType::RightSemi,
+ JoinType::RightAnti,
+ ];
+
+ for join_type in join_types {
+ let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
+ let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+ let session_ctx =
+ SessionContext::with_config_rt(SessionConfig::default(),
runtime);
+ let task_ctx = session_ctx.task_ctx();
+
+ let join = join(left.clone(), right.clone(), on.clone(),
&join_type, false)?;
+
+ let stream = join.execute(0, task_ctx)?;
+ let err = common::collect(stream).await.unwrap_err();
+
+ assert_contains!(
+ err.to_string(),
+ "External error: Resources exhausted: Failed to allocate
additional"
+ );
+
+ // Asserting that operator-level reservation attempting to
overallocate
+ assert_contains!(err.to_string(), "HashJoinExec");
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn partitioned_join_overallocation() -> Result<()> {
+ // Prepare partitioned inputs for HashJoinExec
+ // No need to adjust partitioning, as execution should fail with
`Resources exhausted` error
+ let left_batch = build_table_i32(
+ ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ );
+ let left = Arc::new(
+ MemoryExec::try_new(
+ &[vec![left_batch.clone()], vec![left_batch.clone()]],
+ left_batch.schema(),
+ None,
+ )
+ .unwrap(),
+ );
+ let right_batch = build_table_i32(
+ ("a2", &vec![10, 11]),
+ ("b2", &vec![12, 13]),
+ ("c2", &vec![14, 15]),
+ );
+ let right = Arc::new(
+ MemoryExec::try_new(
+ &[vec![right_batch.clone()], vec![right_batch.clone()]],
+ right_batch.schema(),
+ None,
+ )
+ .unwrap(),
+ );
+ let on = vec![(
+ Column::new_with_schema("b1", &left_batch.schema())?,
+ Column::new_with_schema("b2", &right_batch.schema())?,
+ )];
+
+ let join_types = vec![
+ JoinType::Inner,
+ JoinType::Left,
+ JoinType::Right,
+ JoinType::Full,
+ JoinType::LeftSemi,
+ JoinType::LeftAnti,
+ JoinType::RightSemi,
+ JoinType::RightAnti,
+ ];
+
+ for join_type in join_types {
+ let runtime_config = RuntimeConfig::new().with_memory_limit(100,
1.0);
+ let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+ let session_config = SessionConfig::default().with_batch_size(50);
+ let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
+ let task_ctx = session_ctx.task_ctx();
+
+ let join = HashJoinExec::try_new(
+ left.clone(),
+ right.clone(),
+ on.clone(),
+ None,
+ &join_type,
+ PartitionMode::Partitioned,
+ &false,
+ )?;
+
+ let stream = join.execute(1, task_ctx)?;
+ let err = common::collect(stream).await.unwrap_err();
+
+ assert_contains!(
+ err.to_string(),
+ "External error: Resources exhausted: Failed to allocate
additional"
+ );
+
+ // Asserting that stream-level reservation attempting to
overallocate
+ assert_contains!(err.to_string(), "HashJoinStream[1]");
+ }
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/tests/memory_limit.rs
b/datafusion/core/tests/memory_limit.rs
index 75e26485d..392e54941 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -74,6 +74,16 @@ async fn group_by_hash() {
.await
}
+#[tokio::test]
+async fn join_by_key() {
+ run_limit_test(
+ "select t1.* from t t1 JOIN t t2 ON t1.service = t2.service",
+ "Resources exhausted: Failed to allocate additional",
+ 1_000,
+ )
+ .await
+}
+
#[tokio::test]
async fn cross_join() {
run_limit_test(