This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 161c6d3282 Account for memory usage in SortPreservingMerge (#5885)
(#7130)
161c6d3282 is described below
commit 161c6d32824fc87307341f942ffad7b4d452c82f
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 11:13:31 2023 -0500
Account for memory usage in SortPreservingMerge (#5885) (#7130)
* Account for memory usage in SortPreservingMerge
* Review Comments: Improve documentation and comments
* Review Comments: Improve documentation and comments
---
datafusion/common/src/config.rs | 16 +
.../core/src/physical_plan/repartition/mod.rs | 11 +-
datafusion/core/src/physical_plan/sorts/builder.rs | 22 +-
datafusion/core/src/physical_plan/sorts/cursor.rs | 20 +-
datafusion/core/src/physical_plan/sorts/merge.rs | 30 +-
datafusion/core/src/physical_plan/sorts/sort.rs | 131 +++++---
.../physical_plan/sorts/sort_preserving_merge.rs | 12 +-
datafusion/core/src/physical_plan/sorts/stream.rs | 12 +-
.../core/tests/fuzz_cases/order_spill_fuzz.rs | 26 +-
datafusion/core/tests/memory_limit.rs | 345 ++++++++++++++++++++-
.../test_files/information_schema.slt | 2 +
datafusion/execution/src/config.rs | 26 ++
datafusion/execution/src/disk_manager.rs | 9 +
docs/source/user-guide/configs.md | 2 +
14 files changed, 590 insertions(+), 74 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index f681ae57a3..fe7fb95503 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -235,6 +235,22 @@ config_namespace! {
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = num_cpus::get()
+
+ /// Specifies the reserved memory for each spillable sort operation to
+ /// facilitate an in-memory merge.
+ ///
+ /// When a sort operation spills to disk, the in-memory data must be
+ /// sorted and merged before being written to a file. This setting
reserves
+ /// a specific amount of memory for that in-memory sort/merge process.
+ ///
+ /// Note: This setting is irrelevant if the sort operation cannot spill
+ /// (i.e., if there's no `DiskManager` configured).
+ pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
+
+ /// When sorting, below what size should data be concatenated
+ /// and sorted in a single RecordBatch rather than sorted in
+ /// batches and merged.
+ pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
}
}
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 99b72a1b40..3f83e186ea 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -574,14 +574,21 @@ impl ExecutionPlan for RepartitionExec {
// Get existing ordering:
let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
- // Merge streams (while preserving ordering) coming from input
partitions to this partition:
+
+ // Merge streams (while preserving ordering) coming from
+ // input partitions to this partition:
+ let fetch = None;
+ let merge_reservation =
+ MemoryConsumer::new(format!("{}[Merge {partition}]",
self.name()))
+ .register(context.memory_pool());
streaming_merge(
input_streams,
self.schema(),
sort_exprs,
BaselineMetrics::new(&self.metrics, partition),
context.session_config().batch_size(),
- None,
+ fetch,
+ merge_reservation,
)
} else {
Ok(Box::pin(RepartitionStream {
diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs
b/datafusion/core/src/physical_plan/sorts/builder.rs
index 1c5ec356ee..3527d57382 100644
--- a/datafusion/core/src/physical_plan/sorts/builder.rs
+++ b/datafusion/core/src/physical_plan/sorts/builder.rs
@@ -19,6 +19,7 @@ use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
#[derive(Debug, Copy, Clone, Default)]
struct BatchCursor {
@@ -37,6 +38,9 @@ pub struct BatchBuilder {
/// Maintain a list of [`RecordBatch`] and their corresponding stream
batches: Vec<(usize, RecordBatch)>,
+ /// Accounts for memory used by buffered batches
+ reservation: MemoryReservation,
+
/// The current [`BatchCursor`] for each stream
cursors: Vec<BatchCursor>,
@@ -47,23 +51,31 @@ pub struct BatchBuilder {
impl BatchBuilder {
/// Create a new [`BatchBuilder`] with the provided `stream_count` and
`batch_size`
- pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) ->
Self {
+ pub fn new(
+ schema: SchemaRef,
+ stream_count: usize,
+ batch_size: usize,
+ reservation: MemoryReservation,
+ ) -> Self {
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
+ reservation,
}
}
/// Append a new batch in `stream_idx`
- pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
+ pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) ->
Result<()> {
+ self.reservation.try_grow(batch.get_array_memory_size())?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
batch_idx,
row_idx: 0,
- }
+ };
+ Ok(())
}
/// Append the next row from `stream_idx`
@@ -119,7 +131,7 @@ impl BatchBuilder {
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
- self.batches.retain(|(stream_idx, _)| {
+ self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;
@@ -127,6 +139,8 @@ impl BatchBuilder {
if retain {
stream_cursor.batch_idx = retained;
retained += 1;
+ } else {
+ self.reservation.shrink(batch.get_array_memory_size());
}
retain
});
diff --git a/datafusion/core/src/physical_plan/sorts/cursor.rs
b/datafusion/core/src/physical_plan/sorts/cursor.rs
index a9e5122130..c0c7912886 100644
--- a/datafusion/core/src/physical_plan/sorts/cursor.rs
+++ b/datafusion/core/src/physical_plan/sorts/cursor.rs
@@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp;
use arrow::row::{Row, Rows};
use arrow_array::types::ByteArrayType;
use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use datafusion_execution::memory_pool::MemoryReservation;
use std::cmp::Ordering;
/// A [`Cursor`] for [`Rows`]
@@ -29,6 +30,11 @@ pub struct RowCursor {
num_rows: usize,
rows: Rows,
+
+ /// Tracks for the memory used by in the `Rows` of this
+ /// cursor. Freed on drop
+ #[allow(dead_code)]
+ reservation: MemoryReservation,
}
impl std::fmt::Debug for RowCursor {
@@ -41,12 +47,22 @@ impl std::fmt::Debug for RowCursor {
}
impl RowCursor {
- /// Create a new SortKeyCursor
- pub fn new(rows: Rows) -> Self {
+ /// Create a new SortKeyCursor from `rows` and a `reservation`
+ /// that tracks its memory.
+ ///
+ /// Panic's if the reservation is not for exactly `rows.size()`
+ /// bytes
+ pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
+ assert_eq!(
+ rows.size(),
+ reservation.size(),
+ "memory reservation mismatch"
+ );
Self {
cur_row: 0,
num_rows: rows.num_rows(),
rows,
+ reservation,
}
}
diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs
b/datafusion/core/src/physical_plan/sorts/merge.rs
index 736df7dbe8..f8a1457dd6 100644
--- a/datafusion/core/src/physical_plan/sorts/merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/merge.rs
@@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::*;
use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
@@ -42,7 +43,7 @@ macro_rules! primitive_merge_helper {
}
macro_rules! merge_helper {
- ($t:ty, $sort:ident, $streams:ident, $schema:ident,
$tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{
+ ($t:ty, $sort:ident, $streams:ident, $schema:ident,
$tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident)
=> {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
@@ -50,6 +51,7 @@ macro_rules! merge_helper {
$tracking_metrics,
$batch_size,
$fetch,
+ $reservation,
)));
}};
}
@@ -63,28 +65,36 @@ pub fn streaming_merge(
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
+ reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
// Special case single column comparisons with optimized cursor
implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
- data_type => (primitive_merge_helper, sort, streams, schema,
metrics, batch_size, fetch),
- DataType::Utf8 => merge_helper!(StringArray, sort, streams,
schema, metrics, batch_size, fetch)
- DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort,
streams, schema, metrics, batch_size, fetch)
- DataType::Binary => merge_helper!(BinaryArray, sort, streams,
schema, metrics, batch_size, fetch)
- DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort,
streams, schema, metrics, batch_size, fetch)
+ data_type => (primitive_merge_helper, sort, streams, schema,
metrics, batch_size, fetch, reservation),
+ DataType::Utf8 => merge_helper!(StringArray, sort, streams,
schema, metrics, batch_size, fetch, reservation)
+ DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort,
streams, schema, metrics, batch_size, fetch, reservation)
+ DataType::Binary => merge_helper!(BinaryArray, sort, streams,
schema, metrics, batch_size, fetch, reservation)
+ DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort,
streams, schema, metrics, batch_size, fetch, reservation)
_ => {}
}
}
- let streams = RowCursorStream::try_new(schema.as_ref(), expressions,
streams)?;
+ let streams = RowCursorStream::try_new(
+ schema.as_ref(),
+ expressions,
+ streams,
+ reservation.new_empty(),
+ )?;
+
Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
+ reservation,
)))
}
@@ -162,11 +172,12 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
+ reservation: MemoryReservation,
) -> Self {
let stream_count = streams.partitions();
Self {
- in_progress: BatchBuilder::new(schema, stream_count, batch_size),
+ in_progress: BatchBuilder::new(schema, stream_count, batch_size,
reservation),
streams,
metrics,
aborted: false,
@@ -197,8 +208,7 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
Some(Err(e)) => Poll::Ready(Err(e)),
Some(Ok((cursor, batch))) => {
self.cursors[idx] = Some(cursor);
- self.in_progress.push_batch(idx, batch);
- Poll::Ready(Ok(()))
+ Poll::Ready(self.in_progress.push_batch(idx, batch))
}
}
}
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index c7ae09bb2e..411a425b51 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -210,23 +210,37 @@ struct ExternalSorter {
/// If Some, the maximum number of output rows that will be
/// produced.
fetch: Option<usize>,
- /// Memory usage tracking
+ /// Reservation for in_mem_batches
reservation: MemoryReservation,
- /// The partition id that this Sort is handling (for identification)
- partition_id: usize,
- /// A handle to the runtime to get Disk spill files
+ /// Reservation for the merging of in-memory batches. If the sort
+ /// might spill, `sort_spill_reservation_bytes` will be
+ /// pre-reserved to ensure there is some space for this sort/merge.
+ merge_reservation: MemoryReservation,
+ /// A handle to the runtime to get spill files
runtime: Arc<RuntimeEnv>,
/// The target number of rows for output batches
batch_size: usize,
+ /// How much memory to reserve for performing in-memory sort/merges
+ /// prior to spilling.
+ sort_spill_reservation_bytes: usize,
+ /// If the in size of buffered memory batches is below this size,
+ /// the data will be concated and sorted in place rather than
+ /// sort/merged.
+ sort_in_place_threshold_bytes: usize,
}
impl ExternalSorter {
+ // TOOD: make a builder or some other nicer API to avoid the
+ // clippy warning
+ #[allow(clippy::too_many_arguments)]
pub fn new(
partition_id: usize,
schema: SchemaRef,
expr: Vec<PhysicalSortExpr>,
batch_size: usize,
fetch: Option<usize>,
+ sort_spill_reservation_bytes: usize,
+ sort_in_place_threshold_bytes: usize,
metrics: &ExecutionPlanMetricsSet,
runtime: Arc<RuntimeEnv>,
) -> Self {
@@ -235,6 +249,10 @@ impl ExternalSorter {
.with_can_spill(true)
.register(&runtime.memory_pool);
+ let merge_reservation =
+ MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
+ .register(&runtime.memory_pool);
+
Self {
schema,
in_mem_batches: vec![],
@@ -244,9 +262,11 @@ impl ExternalSorter {
metrics,
fetch,
reservation,
- partition_id,
+ merge_reservation,
runtime,
batch_size,
+ sort_spill_reservation_bytes,
+ sort_in_place_threshold_bytes,
}
}
@@ -257,6 +277,7 @@ impl ExternalSorter {
if input.num_rows() == 0 {
return Ok(());
}
+ self.reserve_memory_for_merge()?;
let size = batch_byte_size(&input);
if self.reservation.try_grow(size).is_err() {
@@ -318,12 +339,10 @@ impl ExternalSorter {
self.metrics.baseline.clone(),
self.batch_size,
self.fetch,
+ self.reservation.new_empty(),
)
} else if !self.in_mem_batches.is_empty() {
- let result =
self.in_mem_sort_stream(self.metrics.baseline.clone());
- // Report to the memory manager we are no longer using memory
- self.reservation.free();
- result
+ self.in_mem_sort_stream(self.metrics.baseline.clone())
} else {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
}
@@ -374,6 +393,11 @@ impl ExternalSorter {
return Ok(());
}
+ // Release the memory reserved for merge back to the pool so
+ // there is some left when `in_memo_sort_stream` requests an
+ // allocation.
+ self.merge_reservation.free();
+
self.in_mem_batches = self
.in_mem_sort_stream(self.metrics.baseline.intermediate())?
.try_collect()
@@ -385,7 +409,10 @@ impl ExternalSorter {
.map(|x| x.get_array_memory_size())
.sum();
- self.reservation.resize(size);
+ // Reserve headroom for next sort/merge
+ self.reserve_memory_for_merge()?;
+
+ self.reservation.try_resize(size)?;
self.in_mem_batches_sorted = true;
Ok(())
}
@@ -455,26 +482,27 @@ impl ExternalSorter {
assert_ne!(self.in_mem_batches.len(), 0);
if self.in_mem_batches.len() == 1 {
let batch = self.in_mem_batches.remove(0);
- let stream = self.sort_batch_stream(batch, metrics)?;
- self.in_mem_batches.clear();
- return Ok(stream);
+ let reservation = self.reservation.take();
+ return self.sort_batch_stream(batch, metrics, reservation);
}
- // If less than 1MB of in-memory data, concatenate and sort in place
- //
- // This is a very rough heuristic and likely could be refined further
- if self.reservation.size() < 1048576 {
+ // If less than sort_in_place_threshold_bytes, concatenate and sort in
place
+ if self.reservation.size() < self.sort_in_place_threshold_bytes {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
- return self.sort_batch_stream(batch, metrics);
+ self.reservation.try_resize(batch.get_array_memory_size())?;
+ let reservation = self.reservation.take();
+ return self.sort_batch_stream(batch, metrics, reservation);
}
let streams = std::mem::take(&mut self.in_mem_batches)
.into_iter()
.map(|batch| {
let metrics = self.metrics.baseline.intermediate();
- Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1))
+ let reservation =
self.reservation.split(batch.get_array_memory_size());
+ let input = self.sort_batch_stream(batch, metrics,
reservation)?;
+ Ok(spawn_buffered(input, 1))
})
.collect::<Result<_>>()?;
@@ -485,35 +513,49 @@ impl ExternalSorter {
metrics,
self.batch_size,
self.fetch,
+ self.merge_reservation.new_empty(),
)
}
- /// Sorts a single `RecordBatch` into a single stream
+ /// Sorts a single `RecordBatch` into a single stream.
+ ///
+ /// `reservation` accounts for the memory used by this batch and
+ /// is released when the sort is complete
fn sort_batch_stream(
&self,
batch: RecordBatch,
metrics: BaselineMetrics,
+ reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
+ assert_eq!(batch.get_array_memory_size(), reservation.size());
let schema = batch.schema();
- let mut reservation =
- MemoryConsumer::new(format!("sort_batch_stream{}",
self.partition_id))
- .register(&self.runtime.memory_pool);
-
- // TODO: This should probably be try_grow (#5885)
- reservation.resize(batch.get_array_memory_size());
-
let fetch = self.fetch;
let expressions = self.expr.clone();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let sorted = sort_batch(&batch, &expressions, fetch)?;
metrics.record_output(sorted.num_rows());
drop(batch);
- reservation.free();
+ drop(reservation);
Ok(sorted)
}));
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
+
+ /// If this sort may spill, pre-allocates
+ /// `sort_spill_reservation_bytes` of memory to gurarantee memory
+ /// left for the in memory sort/merge.
+ fn reserve_memory_for_merge(&mut self) -> Result<()> {
+ // Reserve headroom for next merge sort
+ if self.runtime.disk_manager.tmp_files_enabled() {
+ let size = self.sort_spill_reservation_bytes;
+ if self.merge_reservation.size() != size {
+ self.merge_reservation.try_resize(size)?;
+ }
+ }
+
+ Ok(())
+ }
}
impl Debug for ExternalSorter {
@@ -801,6 +843,8 @@ impl ExecutionPlan for SortExec {
let mut input = self.input.execute(partition, context.clone())?;
+ let execution_options = &context.session_config().options().execution;
+
trace!("End SortExec's input.execute for partition: {}", partition);
let mut sorter = ExternalSorter::new(
@@ -809,6 +853,8 @@ impl ExecutionPlan for SortExec {
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
+ execution_options.sort_spill_reservation_bytes,
+ execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);
@@ -914,9 +960,15 @@ mod tests {
#[tokio::test]
async fn test_sort_spill() -> Result<()> {
// trigger spill there will be 4 batches with 5.5KB for each
- let config = RuntimeConfig::new().with_memory_limit(12288, 1.0);
- let runtime = Arc::new(RuntimeEnv::new(config)?);
- let session_ctx = SessionContext::with_config_rt(SessionConfig::new(),
runtime);
+ let session_config = SessionConfig::new();
+ let sort_spill_reservation_bytes = session_config
+ .options()
+ .execution
+ .sort_spill_reservation_bytes;
+ let rt_config = RuntimeConfig::new()
+ .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0);
+ let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
+ let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
let partitions = 4;
let csv = test::scan_partitioned_csv(partitions)?;
@@ -996,11 +1048,18 @@ mod tests {
];
for (fetch, expect_spillage) in test_options {
- let config = RuntimeConfig::new()
- .with_memory_limit(avg_batch_size * (partitions - 1), 1.0);
- let runtime = Arc::new(RuntimeEnv::new(config)?);
- let session_ctx =
- SessionContext::with_config_rt(SessionConfig::new(), runtime);
+ let session_config = SessionConfig::new();
+ let sort_spill_reservation_bytes = session_config
+ .options()
+ .execution
+ .sort_spill_reservation_bytes;
+
+ let rt_config = RuntimeConfig::new().with_memory_limit(
+ sort_spill_reservation_bytes + avg_batch_size * (partitions -
1),
+ 1.0,
+ );
+ let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
+ let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
let csv = test::scan_partitioned_csv(partitions)?;
let schema = csv.schema();
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index e8d571631b..6b978b5ee7 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -30,6 +30,7 @@ use crate::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
};
+use datafusion_execution::memory_pool::MemoryConsumer;
use arrow::datatypes::SchemaRef;
use datafusion_common::{DataFusionError, Result};
@@ -213,6 +214,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
);
let schema = self.schema();
+ let reservation =
+
MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]"))
+ .register(&context.runtime_env().memory_pool);
+
match input_partitions {
0 => Err(DataFusionError::Internal(
"SortPreservingMergeExec requires at least one input partition"
@@ -241,6 +246,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
BaselineMetrics::new(&self.metrics, partition),
context.session_config().batch_size(),
self.fetch,
+ reservation,
)?;
debug!("Got stream result from
SortPreservingMergeStream::new_from_receivers");
@@ -843,14 +849,18 @@ mod tests {
}
let metrics = ExecutionPlanMetricsSet::new();
+ let reservation =
+
MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool);
+ let fetch = None;
let merge_stream = streaming_merge(
streams,
batches.schema(),
sort.as_slice(),
BaselineMetrics::new(&metrics, 0),
task_ctx.session_config().batch_size(),
- None,
+ fetch,
+ reservation,
)
.unwrap();
diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs
b/datafusion/core/src/physical_plan/sorts/stream.rs
index 97a3b85fa5..9ef13b7eb2 100644
--- a/datafusion/core/src/physical_plan/sorts/stream.rs
+++ b/datafusion/core/src/physical_plan/sorts/stream.rs
@@ -23,6 +23,7 @@ use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
use futures::stream::{Fuse, StreamExt};
use std::marker::PhantomData;
use std::sync::Arc;
@@ -84,6 +85,8 @@ pub struct RowCursorStream {
column_expressions: Vec<Arc<dyn PhysicalExpr>>,
/// Input streams
streams: FusedStreams,
+ /// Tracks the memory used by `converter`
+ reservation: MemoryReservation,
}
impl RowCursorStream {
@@ -91,6 +94,7 @@ impl RowCursorStream {
schema: &Schema,
expressions: &[PhysicalSortExpr],
streams: Vec<SendableRecordBatchStream>,
+ reservation: MemoryReservation,
) -> Result<Self> {
let sort_fields = expressions
.iter()
@@ -104,6 +108,7 @@ impl RowCursorStream {
let converter = RowConverter::new(sort_fields)?;
Ok(Self {
converter,
+ reservation,
column_expressions: expressions.iter().map(|x|
x.expr.clone()).collect(),
streams: FusedStreams(streams),
})
@@ -117,7 +122,12 @@ impl RowCursorStream {
.collect::<Result<Vec<_>>>()?;
let rows = self.converter.convert_columns(&cols)?;
- Ok(RowCursor::new(rows))
+ self.reservation.try_resize(self.converter.size())?;
+
+ // track the memory in the newly created Rows.
+ let mut rows_reservation = self.reservation.new_empty();
+ rows_reservation.try_grow(rows.size())?;
+ Ok(RowCursor::new(rows, rows_reservation))
}
}
diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
index 1f72e0fcb4..d927b2807d 100644
--- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
@@ -22,13 +22,13 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
-use datafusion::execution::memory_pool::GreedyMemoryPool;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_execution::memory_pool::GreedyMemoryPool;
use rand::Rng;
use std::sync::Arc;
use test_utils::{batches_to_vec, partitions_to_sorted_vec};
@@ -76,10 +76,20 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize,
bool)>) {
let exec = MemoryExec::try_new(&input, schema, None).unwrap();
let sort = Arc::new(SortExec::new(sort, Arc::new(exec)));
+ let session_config = SessionConfig::new();
+ // Make sure there is enough space for the initial spill
+ // reservation
+ let pool_size = pool_size.saturating_add(
+ session_config
+ .options()
+ .execution
+ .sort_spill_reservation_bytes,
+ );
+
let runtime_config = RuntimeConfig::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)));
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
- let session_ctx = SessionContext::with_config_rt(SessionConfig::new(),
runtime);
+ let session_ctx = SessionContext::with_config_rt(session_config,
runtime);
let task_ctx = session_ctx.task_ctx();
let collected = collect(sort.clone(), task_ctx).await.unwrap();
@@ -88,9 +98,17 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize,
bool)>) {
let actual = batches_to_vec(&collected);
if spill {
- assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+ assert_ne!(
+ sort.metrics().unwrap().spill_count().unwrap(),
+ 0,
+ "{pool_size} {size}"
+ );
} else {
- assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+ assert_eq!(
+ sort.metrics().unwrap().spill_count().unwrap(),
+ 0,
+ "{pool_size} {size}"
+ );
}
assert_eq!(
diff --git a/datafusion/core/tests/memory_limit.rs
b/datafusion/core/tests/memory_limit.rs
index a7cff6cbd7..80bbbed8f0 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -17,12 +17,21 @@
//! This module contains tests for limiting memory at runtime in DataFusion
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{Int32Type, SchemaRef};
use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, DictionaryArray};
+use arrow_schema::SortOptions;
+use async_trait::async_trait;
+use datafusion::assert_batches_eq;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
+use datafusion::physical_plan::common::batch_byte_size;
+use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::streaming::PartitionStream;
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_expr::PhysicalSortExpr;
use futures::StreamExt;
-use std::sync::Arc;
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::{MemTable, TableProvider};
@@ -31,8 +40,8 @@ use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion::physical_plan::SendableRecordBatchStream;
-use datafusion_common::assert_contains;
+use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
+use datafusion_common::{assert_contains, Result};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_execution::TaskContext;
@@ -196,6 +205,110 @@ async fn symmetric_hash_join() {
.await
}
+#[tokio::test]
+async fn sort_preserving_merge() {
+ let partition_size = batches_byte_size(&dict_batches());
+
+ TestCase::new(
+ // This query uses the exact same ordering as the input table
+ // so only a merge is needed
+ "select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10",
+ vec![
+ "Resources exhausted: Failed to allocate additional",
+ "SortPreservingMergeExec",
+ ],
+ // provide insufficient memory to merge
+ partition_size / 2,
+ )
+ // two partitions of data, so a merge is required
+ .with_scenario(Scenario::DictionaryStrings(2))
+ .with_expected_plan(
+ // It is important that this plan only has
+ // SortPreservingMergeExec (not a Sort which would compete
+ // with the SortPreservingMergeExec for memory)
+ &[
+
"+---------------+-------------------------------------------------------------------------------------------------------------+",
+ "| plan_type | plan
|",
+
"+---------------+-------------------------------------------------------------------------------------------------------------+",
+ "| logical_plan | Limit: skip=0, fetch=10
|",
+ "| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS
LAST, fetch=10 |",
+ "| | TableScan: t projection=[a, b]
|",
+ "| physical_plan | GlobalLimitExec: skip=0, fetch=10
|",
+ "| | SortPreservingMergeExec: [a@0 ASC NULLS
LAST,b@1 ASC NULLS LAST], fetch=10 |",
+ "| | MemoryExec: partitions=2,
partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST
|",
+ "| |
|",
+
"+---------------+-------------------------------------------------------------------------------------------------------------+",
+ ]
+ )
+ .run()
+ .await
+}
+
+#[tokio::test]
+async fn sort_spill_reservation() {
+ let partition_size = batches_byte_size(&dict_batches());
+
+ let base_config = SessionConfig::new()
+ // do not allow the sort to use the 'concat in place' path
+ .with_sort_in_place_threshold_bytes(10);
+
+ // This test case shows how sort_spill_reservation works by
+ // purposely sorting data that requires non trivial memory to
+ // sort/merge.
+ let test = TestCase::new(
+ // This query uses a different order than the input table to
+ // force a sort. It also needs to have multiple columns to
+ // force RowFormat / interner that makes merge require
+ // substantial memory
+ "select * from t ORDER BY a , b DESC",
+ vec![], // expected errors set below
+ // enough memory to sort if we don't try to merge it all at once
+ (partition_size * 5) / 2,
+ )
+ // use a single partiton so only a sort is needed
+ .with_scenario(Scenario::DictionaryStrings(1))
+ .with_disk_manager_config(DiskManagerConfig::NewOs)
+ .with_expected_plan(
+ // It is important that this plan only has a SortExec, not
+ // also merge, so we can ensure the sort could finish
+ // given enough merging memory
+ &[
+
"+---------------+--------------------------------------------------------------------------------------------------------+",
+ "| plan_type | plan
|",
+
"+---------------+--------------------------------------------------------------------------------------------------------+",
+ "| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST
|",
+ "| | TableScan: t projection=[a, b]
|",
+ "| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]
|",
+ "| | MemoryExec: partitions=1, partition_sizes=[5],
output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
+ "| |
|",
+
"+---------------+--------------------------------------------------------------------------------------------------------+",
+ ]
+ );
+
+ let config = base_config
+ .clone()
+ // provide insufficient reserved space for merging,
+ // the sort will fail while trying to merge
+ .with_sort_spill_reservation_bytes(1024);
+
+ test.clone()
+ .with_expected_errors(vec![
+ "Resources exhausted: Failed to allocate additional",
+ "ExternalSorterMerge", // merging in sort fails
+ ])
+ .with_config(config)
+ .run()
+ .await;
+
+ let config = base_config
+ // reserve sufficient space up front for merge and this time,
+ // which will force the spills to happen with less buffered
+ // input and thus with enough to merge.
+ .with_sort_spill_reservation_bytes(2 * partition_size);
+
+ test.with_config(config).with_expected_success().run().await;
+}
+
/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
@@ -205,9 +318,17 @@ struct TestCase {
memory_limit: usize,
config: SessionConfig,
scenario: Scenario,
+ /// How should the disk manager (that allows spilling) be
+ /// configured? Defaults to `Disabled`
+ disk_manager_config: DiskManagerConfig,
+ /// Expected explain plan, if non emptry
+ expected_plan: Vec<String>,
+ /// Is the plan expected to pass? Defaults to false
+ expected_success: bool,
}
impl TestCase {
+ // TODO remove expected errors and memory limits and query from constructor
fn new<'a>(
query: impl Into<String>,
expected_errors: impl IntoIterator<Item = &'a str>,
@@ -222,21 +343,56 @@ impl TestCase {
memory_limit,
config: SessionConfig::new(),
scenario: Scenario::AccessLog,
+ disk_manager_config: DiskManagerConfig::Disabled,
+ expected_plan: vec![],
+ expected_success: false,
}
}
+ /// Set a list of expected strings that must appear in any errors
+ fn with_expected_errors<'a>(
+ mut self,
+ expected_errors: impl IntoIterator<Item = &'a str>,
+ ) -> Self {
+ self.expected_errors =
+ expected_errors.into_iter().map(|s| s.to_string()).collect();
+ self
+ }
+
/// Specify the configuration to use
pub fn with_config(mut self, config: SessionConfig) -> Self {
self.config = config;
self
}
+ /// Mark that the test expects the query to run successfully
+ pub fn with_expected_success(mut self) -> Self {
+ self.expected_success = true;
+ self
+ }
+
/// Specify the scenario to run
pub fn with_scenario(mut self, scenario: Scenario) -> Self {
self.scenario = scenario;
self
}
+ /// Specify if the disk manager should be enabled. If true,
+ /// operators that support it can spill
+ pub fn with_disk_manager_config(
+ mut self,
+ disk_manager_config: DiskManagerConfig,
+ ) -> Self {
+ self.disk_manager_config = disk_manager_config;
+ self
+ }
+
+ /// Specify an expected plan to review
+ pub fn with_expected_plan(mut self, expected_plan: &[&str]) -> Self {
+ self.expected_plan = expected_plan.iter().map(|s|
s.to_string()).collect();
+ self
+ }
+
/// Run the test, panic'ing on error
async fn run(self) {
let Self {
@@ -245,33 +401,62 @@ impl TestCase {
memory_limit,
config,
scenario,
+ disk_manager_config,
+ expected_plan,
+ expected_success,
} = self;
let table = scenario.table();
let rt_config = RuntimeConfig::new()
// do not allow spilling
- .with_disk_manager(DiskManagerConfig::Disabled)
+ .with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);
let runtime = RuntimeEnv::new(rt_config).unwrap();
// Configure execution
- let state = SessionState::with_config_rt(config, Arc::new(runtime))
- .with_physical_optimizer_rules(scenario.rules());
+ let state = SessionState::with_config_rt(config, Arc::new(runtime));
+ let state = match scenario.rules() {
+ Some(rules) => state.with_physical_optimizer_rules(rules),
+ None => state,
+ };
let ctx = SessionContext::with_state(state);
ctx.register_table("t", table).expect("registering table");
let df = ctx.sql(&query).await.expect("Planning query");
+ if !expected_plan.is_empty() {
+ let expected_plan: Vec<_> =
+ expected_plan.iter().map(|s| s.as_str()).collect();
+ let actual_plan = df
+ .clone()
+ .explain(false, false)
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ assert_batches_eq!(expected_plan, &actual_plan);
+ }
+
match df.collect().await {
Ok(_batches) => {
- panic!("Unexpected success when running, expected memory limit
failure")
+ if !expected_success {
+ panic!(
+ "Unexpected success when running, expected memory
limit failure"
+ )
+ }
}
Err(e) => {
- for error_substring in expected_errors {
- assert_contains!(e.to_string(), error_substring);
+ if expected_success {
+ panic!(
+ "Unexpected failure when running, expected success but
got: {e}"
+ )
+ } else {
+ for error_substring in expected_errors {
+ assert_contains!(e.to_string(), error_substring);
+ }
}
}
}
@@ -290,6 +475,9 @@ enum Scenario {
/// 1000 rows of access log data with batches of 50 rows in a
/// [`StreamingTable`]
AccessLogStreaming,
+
+ /// N partitions of of sorted, dictionary encoded strings
+ DictionaryStrings(usize),
}
impl Scenario {
@@ -317,24 +505,53 @@ impl Scenario {
.with_infinite_table(true);
Arc::new(table)
}
+ Self::DictionaryStrings(num_partitions) => {
+ use datafusion::physical_expr::expressions::col;
+ let batches: Vec<Vec<_>> = std::iter::repeat(dict_batches())
+ .take(*num_partitions)
+ .collect();
+
+ let schema = batches[0][0].schema();
+ let options = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+ let sort_information = vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema).unwrap(),
+ options,
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema).unwrap(),
+ options,
+ },
+ ];
+
+ let table = SortedTableProvider::new(batches,
sort_information);
+ Arc::new(table)
+ }
}
}
- /// return the optimizer rules to use
- fn rules(&self) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
+ /// return specific physical optimizer rules to use
+ fn rules(&self) -> Option<Vec<Arc<dyn PhysicalOptimizerRule + Send +
Sync>>> {
match self {
Self::AccessLog => {
// Disabling physical optimizer rules to avoid sorts /
// repartitions (since RepartitionExec / SortExec also
// has a memory budget which we'll likely hit first)
- vec![]
+ Some(vec![])
}
Self::AccessLogStreaming => {
// Disable all physical optimizer rules except the
// JoinSelection rule to avoid sorts or repartition,
// as they also have memory budgets that may be hit
// first
- vec![Arc::new(JoinSelection::new())]
+ Some(vec![Arc::new(JoinSelection::new())])
+ }
+ Self::DictionaryStrings(_) => {
+ // Use default rules
+ None
}
}
}
@@ -347,6 +564,56 @@ fn access_log_batches() -> Vec<RecordBatch> {
.collect()
}
+static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();
+
+/// Returns 5 sorted string dictionary batches each with 50 rows with
+/// this schema.
+///
+/// a: Dictionary<Utf8, Int32>,
+/// b: Dictionary<Utf8, Int32>,
+fn dict_batches() -> Vec<RecordBatch> {
+ DICT_BATCHES.get_or_init(make_dict_batches).clone()
+}
+
+fn make_dict_batches() -> Vec<RecordBatch> {
+ let batch_size = 50;
+
+ let mut i = 0;
+ let gen = std::iter::from_fn(move || {
+ // create values like
+ // 0000000001
+ // 0000000002
+ // ...
+ // 0000000002
+
+ let values: Vec<_> = (i..i + batch_size).map(|x|
format!("{x:010}")).collect();
+ //println!("values: \n{values:?}");
+ let array: DictionaryArray<Int32Type> =
+ values.iter().map(|s| s.as_str()).collect();
+ let array = Arc::new(array) as ArrayRef;
+ let batch =
+ RecordBatch::try_from_iter(vec![("a", array.clone()), ("b",
array)]).unwrap();
+
+ i += batch_size;
+ Some(batch)
+ });
+
+ let num_batches = 5;
+
+ let batches: Vec<_> = gen.take(num_batches).collect();
+
+ batches.iter().enumerate().for_each(|(i, batch)| {
+ println!("Dict batch[{i}] size is: {}", batch_byte_size(batch));
+ });
+
+ batches
+}
+
+// How many bytes does the memory from dict_batches consume?
+fn batches_byte_size(batches: &[RecordBatch]) -> usize {
+ batches.iter().map(batch_byte_size).sum()
+}
+
struct DummyStreamPartition {
schema: SchemaRef,
batches: Vec<RecordBatch>,
@@ -366,3 +633,53 @@ impl PartitionStream for DummyStreamPartition {
))
}
}
+
+/// Wrapper over a TableProvider that can provide ordering information
+struct SortedTableProvider {
+ schema: SchemaRef,
+ batches: Vec<Vec<RecordBatch>>,
+ sort_information: Vec<PhysicalSortExpr>,
+}
+
+impl SortedTableProvider {
+ fn new(
+ batches: Vec<Vec<RecordBatch>>,
+ sort_information: Vec<PhysicalSortExpr>,
+ ) -> Self {
+ let schema = batches[0][0].schema();
+ Self {
+ schema,
+ batches,
+ sort_information,
+ }
+ }
+}
+
+#[async_trait]
+impl TableProvider for SortedTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &SessionState,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let mem_exec =
+ MemoryExec::try_new(&self.batches, self.schema(),
projection.cloned())?
+ .with_sort_information(self.sort_information.clone());
+
+ Ok(Arc::new(mem_exec))
+ }
+}
diff --git
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index fcb818d5fd..162e208201 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -153,6 +153,8 @@ datafusion.execution.parquet.pushdown_filters false
datafusion.execution.parquet.reorder_filters false
datafusion.execution.parquet.skip_metadata true
datafusion.execution.planning_concurrency 13
+datafusion.execution.sort_in_place_threshold_bytes 1048576
+datafusion.execution.sort_spill_reservation_bytes 10485760
datafusion.execution.target_partitions 7
datafusion.execution.time_zone +00:00
datafusion.explain.logical_plan_only false
diff --git a/datafusion/execution/src/config.rs
b/datafusion/execution/src/config.rs
index c847849936..44fcc2ab49 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -287,6 +287,32 @@ impl SessionConfig {
self.options.optimizer.enable_round_robin_repartition
}
+ /// Set the size of [`sort_spill_reservation_bytes`] to control
+ /// memory pre-reservation
+ ///
+ /// [`sort_spill_reservation_bytes`]:
datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
+ pub fn with_sort_spill_reservation_bytes(
+ mut self,
+ sort_spill_reservation_bytes: usize,
+ ) -> Self {
+ self.options.execution.sort_spill_reservation_bytes =
+ sort_spill_reservation_bytes;
+ self
+ }
+
+ /// Set the size of [`sort_in_place_threshold_bytes`] to control
+ /// how sort does things.
+ ///
+ /// [`sort_in_place_threshold_bytes`]:
datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
+ pub fn with_sort_in_place_threshold_bytes(
+ mut self,
+ sort_in_place_threshold_bytes: usize,
+ ) -> Self {
+ self.options.execution.sort_in_place_threshold_bytes =
+ sort_in_place_threshold_bytes;
+ self
+ }
+
/// Convert configuration options to name-value pairs with values
/// converted to strings.
///
diff --git a/datafusion/execution/src/disk_manager.rs
b/datafusion/execution/src/disk_manager.rs
index 107c58fbe3..e8d2ed9cc0 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -102,6 +102,13 @@ impl DiskManager {
}
}
+ /// Return true if this disk manager supports creating temporary
+ /// files. If this returns false, any call to `create_tmp_file`
+ /// will error.
+ pub fn tmp_files_enabled(&self) -> bool {
+ self.local_dirs.lock().is_some()
+ }
+
/// Return a temporary file from a randomized choice in the configured
locations
///
/// If the file can not be created for some reason, returns an
@@ -198,6 +205,7 @@ mod tests {
);
let dm = DiskManager::try_new(config)?;
+ assert!(dm.tmp_files_enabled());
let actual = dm.create_tmp_file("Testing")?;
// the file should be in one of the specified local directories
@@ -210,6 +218,7 @@ mod tests {
fn test_disabled_disk_manager() {
let config = DiskManagerConfig::Disabled;
let manager = DiskManager::try_new(config).unwrap();
+ assert!(!manager.tmp_files_enabled());
assert_eq!(
manager.create_tmp_file("Testing").unwrap_err().to_string(),
"Resources exhausted: Memory Exhausted while Testing (DiskManager
is disabled)",
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index bff7cb4da0..63c9c064bc 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -57,6 +57,8 @@ Environment variables are read during `SessionConfig`
initialisation so they mus
| datafusion.execution.parquet.reorder_filters | false | If
true, filter expressions evaluated during the parquet decoding operation will
be reordered heuristically to minimize the cost of evaluation. If false, the
filters are applied in the same order as written in the query
[...]
| datafusion.execution.aggregate.scalar_update_factor | 10 |
Specifies the threshold for using `ScalarValue`s to update accumulators during
high-cardinality aggregations for each input batch. The aggregation is
considered high-cardinality if the number of affected groups is greater than or
equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are
utilized for updating accumulators, rather than the default batch-slice
approach. This can lead to perform [...]
| datafusion.execution.planning_concurrency | 0 |
Fan-out during initial physical planning. This is mostly use to plan `UNION`
children in parallel. Defaults to the number of CPU cores on the system
[...]
+| datafusion.execution.sort_spill_reservation_bytes | 10485760 |
Specifies the reserved memory for each spillable sort operation to facilitate
an in-memory merge. When a sort operation spills to disk, the in-memory data
must be sorted and merged before being written to a file. This setting reserves
a specific amount of memory for that in-memory sort/merge process. Note: This
setting is irrelevant if the sort operation cannot spill (i.e., if there's no
`DiskManager` configured) [...]
+| datafusion.execution.sort_in_place_threshold_bytes | 1048576 |
When sorting, below what size should data be concatenated and sorted in a
single RecordBatch rather than sorted in batches and merged.
[...]
| datafusion.optimizer.enable_round_robin_repartition | true |
When set to true, the physical plan optimizer will try to add round robin
repartitioning to increase parallelism to leverage more CPU cores
[...]
| datafusion.optimizer.filter_null_join_keys | false |
When set to true, the optimizer will insert filters before a join between a
nullable and non-nullable column to filter out nulls on the nullable side. This
filter can add additional overhead when the file format does not fully support
predicate push down.
[...]
| datafusion.optimizer.repartition_aggregations | true |
Should DataFusion repartition data using the aggregate keys to execute
aggregates in parallel using the provided `target_partitions` level
[...]