This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 161c6d3282 Account for memory usage in SortPreservingMerge (#5885) 
(#7130)
161c6d3282 is described below

commit 161c6d32824fc87307341f942ffad7b4d452c82f
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 11:13:31 2023 -0500

    Account for memory usage in SortPreservingMerge (#5885) (#7130)
    
    * Account for memory usage in SortPreservingMerge
    
    * Review Comments: Improve documentation and comments
    
    * Review Comments: Improve documentation and comments
---
 datafusion/common/src/config.rs                    |  16 +
 .../core/src/physical_plan/repartition/mod.rs      |  11 +-
 datafusion/core/src/physical_plan/sorts/builder.rs |  22 +-
 datafusion/core/src/physical_plan/sorts/cursor.rs  |  20 +-
 datafusion/core/src/physical_plan/sorts/merge.rs   |  30 +-
 datafusion/core/src/physical_plan/sorts/sort.rs    | 131 +++++---
 .../physical_plan/sorts/sort_preserving_merge.rs   |  12 +-
 datafusion/core/src/physical_plan/sorts/stream.rs  |  12 +-
 .../core/tests/fuzz_cases/order_spill_fuzz.rs      |  26 +-
 datafusion/core/tests/memory_limit.rs              | 345 ++++++++++++++++++++-
 .../test_files/information_schema.slt              |   2 +
 datafusion/execution/src/config.rs                 |  26 ++
 datafusion/execution/src/disk_manager.rs           |   9 +
 docs/source/user-guide/configs.md                  |   2 +
 14 files changed, 590 insertions(+), 74 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index f681ae57a3..fe7fb95503 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -235,6 +235,22 @@ config_namespace! {
         ///
         /// Defaults to the number of CPU cores on the system
         pub planning_concurrency: usize, default = num_cpus::get()
+
+        /// Specifies the reserved memory for each spillable sort operation to
+        /// facilitate an in-memory merge.
+        ///
+        /// When a sort operation spills to disk, the in-memory data must be
+        /// sorted and merged before being written to a file. This setting 
reserves
+        /// a specific amount of memory for that in-memory sort/merge process.
+        ///
+        /// Note: This setting is irrelevant if the sort operation cannot spill
+        /// (i.e., if there's no `DiskManager` configured).
+        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
+
+        /// When sorting, below what size should data be concatenated
+        /// and sorted in a single RecordBatch rather than sorted in
+        /// batches and merged.
+        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 99b72a1b40..3f83e186ea 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -574,14 +574,21 @@ impl ExecutionPlan for RepartitionExec {
 
             // Get existing ordering:
             let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
-            // Merge streams (while preserving ordering) coming from input 
partitions to this partition:
+
+            // Merge streams (while preserving ordering) coming from
+            // input partitions to this partition:
+            let fetch = None;
+            let merge_reservation =
+                MemoryConsumer::new(format!("{}[Merge {partition}]", 
self.name()))
+                    .register(context.memory_pool());
             streaming_merge(
                 input_streams,
                 self.schema(),
                 sort_exprs,
                 BaselineMetrics::new(&self.metrics, partition),
                 context.session_config().batch_size(),
-                None,
+                fetch,
+                merge_reservation,
             )
         } else {
             Ok(Box::pin(RepartitionStream {
diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs 
b/datafusion/core/src/physical_plan/sorts/builder.rs
index 1c5ec356ee..3527d57382 100644
--- a/datafusion/core/src/physical_plan/sorts/builder.rs
+++ b/datafusion/core/src/physical_plan/sorts/builder.rs
@@ -19,6 +19,7 @@ use arrow::compute::interleave;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
 
 #[derive(Debug, Copy, Clone, Default)]
 struct BatchCursor {
@@ -37,6 +38,9 @@ pub struct BatchBuilder {
     /// Maintain a list of [`RecordBatch`] and their corresponding stream
     batches: Vec<(usize, RecordBatch)>,
 
+    /// Accounts for memory used by buffered batches
+    reservation: MemoryReservation,
+
     /// The current [`BatchCursor`] for each stream
     cursors: Vec<BatchCursor>,
 
@@ -47,23 +51,31 @@ pub struct BatchBuilder {
 
 impl BatchBuilder {
     /// Create a new [`BatchBuilder`] with the provided `stream_count` and 
`batch_size`
-    pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> 
Self {
+    pub fn new(
+        schema: SchemaRef,
+        stream_count: usize,
+        batch_size: usize,
+        reservation: MemoryReservation,
+    ) -> Self {
         Self {
             schema,
             batches: Vec::with_capacity(stream_count * 2),
             cursors: vec![BatchCursor::default(); stream_count],
             indices: Vec::with_capacity(batch_size),
+            reservation,
         }
     }
 
     /// Append a new batch in `stream_idx`
-    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
+    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
+        self.reservation.try_grow(batch.get_array_memory_size())?;
         let batch_idx = self.batches.len();
         self.batches.push((stream_idx, batch));
         self.cursors[stream_idx] = BatchCursor {
             batch_idx,
             row_idx: 0,
-        }
+        };
+        Ok(())
     }
 
     /// Append the next row from `stream_idx`
@@ -119,7 +131,7 @@ impl BatchBuilder {
         // We can therefore drop all but the last batch for each stream
         let mut batch_idx = 0;
         let mut retained = 0;
-        self.batches.retain(|(stream_idx, _)| {
+        self.batches.retain(|(stream_idx, batch)| {
             let stream_cursor = &mut self.cursors[*stream_idx];
             let retain = stream_cursor.batch_idx == batch_idx;
             batch_idx += 1;
@@ -127,6 +139,8 @@ impl BatchBuilder {
             if retain {
                 stream_cursor.batch_idx = retained;
                 retained += 1;
+            } else {
+                self.reservation.shrink(batch.get_array_memory_size());
             }
             retain
         });
diff --git a/datafusion/core/src/physical_plan/sorts/cursor.rs 
b/datafusion/core/src/physical_plan/sorts/cursor.rs
index a9e5122130..c0c7912886 100644
--- a/datafusion/core/src/physical_plan/sorts/cursor.rs
+++ b/datafusion/core/src/physical_plan/sorts/cursor.rs
@@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp;
 use arrow::row::{Row, Rows};
 use arrow_array::types::ByteArrayType;
 use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
+use datafusion_execution::memory_pool::MemoryReservation;
 use std::cmp::Ordering;
 
 /// A [`Cursor`] for [`Rows`]
@@ -29,6 +30,11 @@ pub struct RowCursor {
     num_rows: usize,
 
     rows: Rows,
+
+    /// Tracks for the memory used by in the `Rows` of this
+    /// cursor. Freed on drop
+    #[allow(dead_code)]
+    reservation: MemoryReservation,
 }
 
 impl std::fmt::Debug for RowCursor {
@@ -41,12 +47,22 @@ impl std::fmt::Debug for RowCursor {
 }
 
 impl RowCursor {
-    /// Create a new SortKeyCursor
-    pub fn new(rows: Rows) -> Self {
+    /// Create a new SortKeyCursor from `rows` and a `reservation`
+    /// that tracks its memory.
+    ///
+    /// Panic's if the reservation is not for exactly `rows.size()`
+    /// bytes
+    pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
+        assert_eq!(
+            rows.size(),
+            reservation.size(),
+            "memory reservation mismatch"
+        );
         Self {
             cur_row: 0,
             num_rows: rows.num_rows(),
             rows,
+            reservation,
         }
     }
 
diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs 
b/datafusion/core/src/physical_plan/sorts/merge.rs
index 736df7dbe8..f8a1457dd6 100644
--- a/datafusion/core/src/physical_plan/sorts/merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/merge.rs
@@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow_array::*;
 use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
 use futures::Stream;
 use std::pin::Pin;
 use std::task::{ready, Context, Poll};
@@ -42,7 +43,7 @@ macro_rules! primitive_merge_helper {
 }
 
 macro_rules! merge_helper {
-    ($t:ty, $sort:ident, $streams:ident, $schema:ident, 
$tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{
+    ($t:ty, $sort:ident, $streams:ident, $schema:ident, 
$tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) 
=> {{
         let streams = FieldCursorStream::<$t>::new($sort, $streams);
         return Ok(Box::pin(SortPreservingMergeStream::new(
             Box::new(streams),
@@ -50,6 +51,7 @@ macro_rules! merge_helper {
             $tracking_metrics,
             $batch_size,
             $fetch,
+            $reservation,
         )));
     }};
 }
@@ -63,28 +65,36 @@ pub fn streaming_merge(
     metrics: BaselineMetrics,
     batch_size: usize,
     fetch: Option<usize>,
+    reservation: MemoryReservation,
 ) -> Result<SendableRecordBatchStream> {
     // Special case single column comparisons with optimized cursor 
implementations
     if expressions.len() == 1 {
         let sort = expressions[0].clone();
         let data_type = sort.expr.data_type(schema.as_ref())?;
         downcast_primitive! {
-            data_type => (primitive_merge_helper, sort, streams, schema, 
metrics, batch_size, fetch),
-            DataType::Utf8 => merge_helper!(StringArray, sort, streams, 
schema, metrics, batch_size, fetch)
-            DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, 
streams, schema, metrics, batch_size, fetch)
-            DataType::Binary => merge_helper!(BinaryArray, sort, streams, 
schema, metrics, batch_size, fetch)
-            DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, 
streams, schema, metrics, batch_size, fetch)
+            data_type => (primitive_merge_helper, sort, streams, schema, 
metrics, batch_size, fetch, reservation),
+            DataType::Utf8 => merge_helper!(StringArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation)
+            DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation)
+            DataType::Binary => merge_helper!(BinaryArray, sort, streams, 
schema, metrics, batch_size, fetch, reservation)
+            DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, 
streams, schema, metrics, batch_size, fetch, reservation)
             _ => {}
         }
     }
 
-    let streams = RowCursorStream::try_new(schema.as_ref(), expressions, 
streams)?;
+    let streams = RowCursorStream::try_new(
+        schema.as_ref(),
+        expressions,
+        streams,
+        reservation.new_empty(),
+    )?;
+
     Ok(Box::pin(SortPreservingMergeStream::new(
         Box::new(streams),
         schema,
         metrics,
         batch_size,
         fetch,
+        reservation,
     )))
 }
 
@@ -162,11 +172,12 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
         metrics: BaselineMetrics,
         batch_size: usize,
         fetch: Option<usize>,
+        reservation: MemoryReservation,
     ) -> Self {
         let stream_count = streams.partitions();
 
         Self {
-            in_progress: BatchBuilder::new(schema, stream_count, batch_size),
+            in_progress: BatchBuilder::new(schema, stream_count, batch_size, 
reservation),
             streams,
             metrics,
             aborted: false,
@@ -197,8 +208,7 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
             Some(Err(e)) => Poll::Ready(Err(e)),
             Some(Ok((cursor, batch))) => {
                 self.cursors[idx] = Some(cursor);
-                self.in_progress.push_batch(idx, batch);
-                Poll::Ready(Ok(()))
+                Poll::Ready(self.in_progress.push_batch(idx, batch))
             }
         }
     }
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index c7ae09bb2e..411a425b51 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -210,23 +210,37 @@ struct ExternalSorter {
     /// If Some, the maximum number of output rows that will be
     /// produced.
     fetch: Option<usize>,
-    /// Memory usage tracking
+    /// Reservation for in_mem_batches
     reservation: MemoryReservation,
-    /// The partition id that this Sort is handling (for identification)
-    partition_id: usize,
-    /// A handle to the runtime to get Disk spill files
+    /// Reservation for the merging of in-memory batches. If the sort
+    /// might spill, `sort_spill_reservation_bytes` will be
+    /// pre-reserved to ensure there is some space for this sort/merge.
+    merge_reservation: MemoryReservation,
+    /// A handle to the runtime to get spill files
     runtime: Arc<RuntimeEnv>,
     /// The target number of rows for output batches
     batch_size: usize,
+    /// How much memory to reserve for performing in-memory sort/merges
+    /// prior to spilling.
+    sort_spill_reservation_bytes: usize,
+    /// If the in size of buffered memory batches is below this size,
+    /// the data will be concated and sorted in place rather than
+    /// sort/merged.
+    sort_in_place_threshold_bytes: usize,
 }
 
 impl ExternalSorter {
+    // TOOD: make a builder or some other nicer API to avoid the
+    // clippy warning
+    #[allow(clippy::too_many_arguments)]
     pub fn new(
         partition_id: usize,
         schema: SchemaRef,
         expr: Vec<PhysicalSortExpr>,
         batch_size: usize,
         fetch: Option<usize>,
+        sort_spill_reservation_bytes: usize,
+        sort_in_place_threshold_bytes: usize,
         metrics: &ExecutionPlanMetricsSet,
         runtime: Arc<RuntimeEnv>,
     ) -> Self {
@@ -235,6 +249,10 @@ impl ExternalSorter {
             .with_can_spill(true)
             .register(&runtime.memory_pool);
 
+        let merge_reservation =
+            MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
+                .register(&runtime.memory_pool);
+
         Self {
             schema,
             in_mem_batches: vec![],
@@ -244,9 +262,11 @@ impl ExternalSorter {
             metrics,
             fetch,
             reservation,
-            partition_id,
+            merge_reservation,
             runtime,
             batch_size,
+            sort_spill_reservation_bytes,
+            sort_in_place_threshold_bytes,
         }
     }
 
@@ -257,6 +277,7 @@ impl ExternalSorter {
         if input.num_rows() == 0 {
             return Ok(());
         }
+        self.reserve_memory_for_merge()?;
 
         let size = batch_byte_size(&input);
         if self.reservation.try_grow(size).is_err() {
@@ -318,12 +339,10 @@ impl ExternalSorter {
                 self.metrics.baseline.clone(),
                 self.batch_size,
                 self.fetch,
+                self.reservation.new_empty(),
             )
         } else if !self.in_mem_batches.is_empty() {
-            let result = 
self.in_mem_sort_stream(self.metrics.baseline.clone());
-            // Report to the memory manager we are no longer using memory
-            self.reservation.free();
-            result
+            self.in_mem_sort_stream(self.metrics.baseline.clone())
         } else {
             Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
         }
@@ -374,6 +393,11 @@ impl ExternalSorter {
             return Ok(());
         }
 
+        // Release the memory reserved for merge back to the pool so
+        // there is some left when `in_memo_sort_stream` requests an
+        // allocation.
+        self.merge_reservation.free();
+
         self.in_mem_batches = self
             .in_mem_sort_stream(self.metrics.baseline.intermediate())?
             .try_collect()
@@ -385,7 +409,10 @@ impl ExternalSorter {
             .map(|x| x.get_array_memory_size())
             .sum();
 
-        self.reservation.resize(size);
+        // Reserve headroom for next sort/merge
+        self.reserve_memory_for_merge()?;
+
+        self.reservation.try_resize(size)?;
         self.in_mem_batches_sorted = true;
         Ok(())
     }
@@ -455,26 +482,27 @@ impl ExternalSorter {
         assert_ne!(self.in_mem_batches.len(), 0);
         if self.in_mem_batches.len() == 1 {
             let batch = self.in_mem_batches.remove(0);
-            let stream = self.sort_batch_stream(batch, metrics)?;
-            self.in_mem_batches.clear();
-            return Ok(stream);
+            let reservation = self.reservation.take();
+            return self.sort_batch_stream(batch, metrics, reservation);
         }
 
-        // If less than 1MB of in-memory data, concatenate and sort in place
-        //
-        // This is a very rough heuristic and likely could be refined further
-        if self.reservation.size() < 1048576 {
+        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
+        if self.reservation.size() < self.sort_in_place_threshold_bytes {
             // Concatenate memory batches together and sort
             let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
             self.in_mem_batches.clear();
-            return self.sort_batch_stream(batch, metrics);
+            self.reservation.try_resize(batch.get_array_memory_size())?;
+            let reservation = self.reservation.take();
+            return self.sort_batch_stream(batch, metrics, reservation);
         }
 
         let streams = std::mem::take(&mut self.in_mem_batches)
             .into_iter()
             .map(|batch| {
                 let metrics = self.metrics.baseline.intermediate();
-                Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1))
+                let reservation = 
self.reservation.split(batch.get_array_memory_size());
+                let input = self.sort_batch_stream(batch, metrics, 
reservation)?;
+                Ok(spawn_buffered(input, 1))
             })
             .collect::<Result<_>>()?;
 
@@ -485,35 +513,49 @@ impl ExternalSorter {
             metrics,
             self.batch_size,
             self.fetch,
+            self.merge_reservation.new_empty(),
         )
     }
 
-    /// Sorts a single `RecordBatch` into a single stream
+    /// Sorts a single `RecordBatch` into a single stream.
+    ///
+    /// `reservation` accounts for the memory used by this batch and
+    /// is released when the sort is complete
     fn sort_batch_stream(
         &self,
         batch: RecordBatch,
         metrics: BaselineMetrics,
+        reservation: MemoryReservation,
     ) -> Result<SendableRecordBatchStream> {
+        assert_eq!(batch.get_array_memory_size(), reservation.size());
         let schema = batch.schema();
 
-        let mut reservation =
-            MemoryConsumer::new(format!("sort_batch_stream{}", 
self.partition_id))
-                .register(&self.runtime.memory_pool);
-
-        // TODO: This should probably be try_grow (#5885)
-        reservation.resize(batch.get_array_memory_size());
-
         let fetch = self.fetch;
         let expressions = self.expr.clone();
         let stream = futures::stream::once(futures::future::lazy(move |_| {
             let sorted = sort_batch(&batch, &expressions, fetch)?;
             metrics.record_output(sorted.num_rows());
             drop(batch);
-            reservation.free();
+            drop(reservation);
             Ok(sorted)
         }));
         Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
     }
+
+    /// If this sort may spill, pre-allocates
+    /// `sort_spill_reservation_bytes` of memory to gurarantee memory
+    /// left for the in memory sort/merge.
+    fn reserve_memory_for_merge(&mut self) -> Result<()> {
+        // Reserve headroom for next merge sort
+        if self.runtime.disk_manager.tmp_files_enabled() {
+            let size = self.sort_spill_reservation_bytes;
+            if self.merge_reservation.size() != size {
+                self.merge_reservation.try_resize(size)?;
+            }
+        }
+
+        Ok(())
+    }
 }
 
 impl Debug for ExternalSorter {
@@ -801,6 +843,8 @@ impl ExecutionPlan for SortExec {
 
         let mut input = self.input.execute(partition, context.clone())?;
 
+        let execution_options = &context.session_config().options().execution;
+
         trace!("End SortExec's input.execute for partition: {}", partition);
 
         let mut sorter = ExternalSorter::new(
@@ -809,6 +853,8 @@ impl ExecutionPlan for SortExec {
             self.expr.clone(),
             context.session_config().batch_size(),
             self.fetch,
+            execution_options.sort_spill_reservation_bytes,
+            execution_options.sort_in_place_threshold_bytes,
             &self.metrics_set,
             context.runtime_env(),
         );
@@ -914,9 +960,15 @@ mod tests {
     #[tokio::test]
     async fn test_sort_spill() -> Result<()> {
         // trigger spill there will be 4 batches with 5.5KB for each
-        let config = RuntimeConfig::new().with_memory_limit(12288, 1.0);
-        let runtime = Arc::new(RuntimeEnv::new(config)?);
-        let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), 
runtime);
+        let session_config = SessionConfig::new();
+        let sort_spill_reservation_bytes = session_config
+            .options()
+            .execution
+            .sort_spill_reservation_bytes;
+        let rt_config = RuntimeConfig::new()
+            .with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0);
+        let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
+        let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
 
         let partitions = 4;
         let csv = test::scan_partitioned_csv(partitions)?;
@@ -996,11 +1048,18 @@ mod tests {
         ];
 
         for (fetch, expect_spillage) in test_options {
-            let config = RuntimeConfig::new()
-                .with_memory_limit(avg_batch_size * (partitions - 1), 1.0);
-            let runtime = Arc::new(RuntimeEnv::new(config)?);
-            let session_ctx =
-                SessionContext::with_config_rt(SessionConfig::new(), runtime);
+            let session_config = SessionConfig::new();
+            let sort_spill_reservation_bytes = session_config
+                .options()
+                .execution
+                .sort_spill_reservation_bytes;
+
+            let rt_config = RuntimeConfig::new().with_memory_limit(
+                sort_spill_reservation_bytes + avg_batch_size * (partitions - 
1),
+                1.0,
+            );
+            let runtime = Arc::new(RuntimeEnv::new(rt_config)?);
+            let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
 
             let csv = test::scan_partitioned_csv(partitions)?;
             let schema = csv.schema();
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index e8d571631b..6b978b5ee7 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -30,6 +30,7 @@ use crate::physical_plan::{
     DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
     SendableRecordBatchStream, Statistics,
 };
+use datafusion_execution::memory_pool::MemoryConsumer;
 
 use arrow::datatypes::SchemaRef;
 use datafusion_common::{DataFusionError, Result};
@@ -213,6 +214,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
         );
         let schema = self.schema();
 
+        let reservation =
+            
MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]"))
+                .register(&context.runtime_env().memory_pool);
+
         match input_partitions {
             0 => Err(DataFusionError::Internal(
                 "SortPreservingMergeExec requires at least one input partition"
@@ -241,6 +246,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
                     BaselineMetrics::new(&self.metrics, partition),
                     context.session_config().batch_size(),
                     self.fetch,
+                    reservation,
                 )?;
 
                 debug!("Got stream result from 
SortPreservingMergeStream::new_from_receivers");
@@ -843,14 +849,18 @@ mod tests {
         }
 
         let metrics = ExecutionPlanMetricsSet::new();
+        let reservation =
+            
MemoryConsumer::new("test").register(&task_ctx.runtime_env().memory_pool);
 
+        let fetch = None;
         let merge_stream = streaming_merge(
             streams,
             batches.schema(),
             sort.as_slice(),
             BaselineMetrics::new(&metrics, 0),
             task_ctx.session_config().batch_size(),
-            None,
+            fetch,
+            reservation,
         )
         .unwrap();
 
diff --git a/datafusion/core/src/physical_plan/sorts/stream.rs 
b/datafusion/core/src/physical_plan/sorts/stream.rs
index 97a3b85fa5..9ef13b7eb2 100644
--- a/datafusion/core/src/physical_plan/sorts/stream.rs
+++ b/datafusion/core/src/physical_plan/sorts/stream.rs
@@ -23,6 +23,7 @@ use arrow::datatypes::Schema;
 use arrow::record_batch::RecordBatch;
 use arrow::row::{RowConverter, SortField};
 use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
 use futures::stream::{Fuse, StreamExt};
 use std::marker::PhantomData;
 use std::sync::Arc;
@@ -84,6 +85,8 @@ pub struct RowCursorStream {
     column_expressions: Vec<Arc<dyn PhysicalExpr>>,
     /// Input streams
     streams: FusedStreams,
+    /// Tracks the memory used by `converter`
+    reservation: MemoryReservation,
 }
 
 impl RowCursorStream {
@@ -91,6 +94,7 @@ impl RowCursorStream {
         schema: &Schema,
         expressions: &[PhysicalSortExpr],
         streams: Vec<SendableRecordBatchStream>,
+        reservation: MemoryReservation,
     ) -> Result<Self> {
         let sort_fields = expressions
             .iter()
@@ -104,6 +108,7 @@ impl RowCursorStream {
         let converter = RowConverter::new(sort_fields)?;
         Ok(Self {
             converter,
+            reservation,
             column_expressions: expressions.iter().map(|x| 
x.expr.clone()).collect(),
             streams: FusedStreams(streams),
         })
@@ -117,7 +122,12 @@ impl RowCursorStream {
             .collect::<Result<Vec<_>>>()?;
 
         let rows = self.converter.convert_columns(&cols)?;
-        Ok(RowCursor::new(rows))
+        self.reservation.try_resize(self.converter.size())?;
+
+        // track the memory in the newly created Rows.
+        let mut rows_reservation = self.reservation.new_empty();
+        rows_reservation.try_grow(rows.size())?;
+        Ok(RowCursor::new(rows, rows_reservation))
     }
 }
 
diff --git a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
index 1f72e0fcb4..d927b2807d 100644
--- a/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
@@ -22,13 +22,13 @@ use arrow::{
     compute::SortOptions,
     record_batch::RecordBatch,
 };
-use datafusion::execution::memory_pool::GreedyMemoryPool;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_execution::memory_pool::GreedyMemoryPool;
 use rand::Rng;
 use std::sync::Arc;
 use test_utils::{batches_to_vec, partitions_to_sorted_vec};
@@ -76,10 +76,20 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, 
bool)>) {
         let exec = MemoryExec::try_new(&input, schema, None).unwrap();
         let sort = Arc::new(SortExec::new(sort, Arc::new(exec)));
 
+        let session_config = SessionConfig::new();
+        // Make sure there is enough space for the initial spill
+        // reservation
+        let pool_size = pool_size.saturating_add(
+            session_config
+                .options()
+                .execution
+                .sort_spill_reservation_bytes,
+        );
+
         let runtime_config = RuntimeConfig::new()
             .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)));
         let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
-        let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), 
runtime);
+        let session_ctx = SessionContext::with_config_rt(session_config, 
runtime);
 
         let task_ctx = session_ctx.task_ctx();
         let collected = collect(sort.clone(), task_ctx).await.unwrap();
@@ -88,9 +98,17 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, 
bool)>) {
         let actual = batches_to_vec(&collected);
 
         if spill {
-            assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+            assert_ne!(
+                sort.metrics().unwrap().spill_count().unwrap(),
+                0,
+                "{pool_size} {size}"
+            );
         } else {
-            assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
+            assert_eq!(
+                sort.metrics().unwrap().spill_count().unwrap(),
+                0,
+                "{pool_size} {size}"
+            );
         }
 
         assert_eq!(
diff --git a/datafusion/core/tests/memory_limit.rs 
b/datafusion/core/tests/memory_limit.rs
index a7cff6cbd7..80bbbed8f0 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -17,12 +17,21 @@
 
 //! This module contains tests for limiting memory at runtime in DataFusion
 
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{Int32Type, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, DictionaryArray};
+use arrow_schema::SortOptions;
+use async_trait::async_trait;
+use datafusion::assert_batches_eq;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
+use datafusion::physical_plan::common::batch_byte_size;
+use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::streaming::PartitionStream;
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_expr::PhysicalSortExpr;
 use futures::StreamExt;
-use std::sync::Arc;
+use std::any::Any;
+use std::sync::{Arc, OnceLock};
 
 use datafusion::datasource::streaming::StreamingTable;
 use datafusion::datasource::{MemTable, TableProvider};
@@ -31,8 +40,8 @@ use datafusion::execution::disk_manager::DiskManagerConfig;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::physical_optimizer::join_selection::JoinSelection;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion::physical_plan::SendableRecordBatchStream;
-use datafusion_common::assert_contains;
+use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
+use datafusion_common::{assert_contains, Result};
 
 use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_execution::TaskContext;
@@ -196,6 +205,110 @@ async fn symmetric_hash_join() {
     .await
 }
 
+#[tokio::test]
+async fn sort_preserving_merge() {
+    let partition_size = batches_byte_size(&dict_batches());
+
+    TestCase::new(
+        // This query uses the exact same ordering as the input table
+        // so only a merge is needed
+        "select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10",
+        vec![
+            "Resources exhausted: Failed to allocate additional",
+            "SortPreservingMergeExec",
+        ],
+        // provide insufficient memory to merge
+        partition_size / 2,
+    )
+        // two partitions of data, so a merge is required
+        .with_scenario(Scenario::DictionaryStrings(2))
+        .with_expected_plan(
+            // It is important that this plan only has
+            // SortPreservingMergeExec (not a Sort which would compete
+            // with the SortPreservingMergeExec for memory)
+            &[
+                
"+---------------+-------------------------------------------------------------------------------------------------------------+",
+                "| plan_type     | plan                                        
                                                                |",
+                
"+---------------+-------------------------------------------------------------------------------------------------------------+",
+                "| logical_plan  | Limit: skip=0, fetch=10                     
                                                                |",
+                "|               |   Sort: t.a ASC NULLS LAST, t.b ASC NULLS 
LAST, fetch=10                                                    |",
+                "|               |     TableScan: t projection=[a, b]          
                                                                |",
+                "| physical_plan | GlobalLimitExec: skip=0, fetch=10           
                                                                |",
+                "|               |   SortPreservingMergeExec: [a@0 ASC NULLS 
LAST,b@1 ASC NULLS LAST], fetch=10                                |",
+                "|               |     MemoryExec: partitions=2, 
partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST 
|",
+                "|               |                                             
                                                                |",
+                
"+---------------+-------------------------------------------------------------------------------------------------------------+",
+            ]
+        )
+        .run()
+        .await
+}
+
+#[tokio::test]
+async fn sort_spill_reservation() {
+    let partition_size = batches_byte_size(&dict_batches());
+
+    let base_config = SessionConfig::new()
+        // do not allow the sort to use the 'concat in place' path
+        .with_sort_in_place_threshold_bytes(10);
+
+    // This test case shows how sort_spill_reservation works by
+    // purposely sorting data that requires non trivial memory to
+    // sort/merge.
+    let test = TestCase::new(
+        // This query uses a different order than the input table to
+        // force a sort. It also needs to have multiple columns to
+        // force RowFormat / interner that makes merge require
+        // substantial memory
+        "select * from t ORDER BY a , b DESC",
+        vec![], // expected errors set below
+        // enough memory to sort if we don't try to merge it all at once
+        (partition_size * 5) / 2,
+    )
+        // use a single partiton so only a sort is needed
+        .with_scenario(Scenario::DictionaryStrings(1))
+        .with_disk_manager_config(DiskManagerConfig::NewOs)
+        .with_expected_plan(
+            // It is important that this plan only has a SortExec, not
+            // also merge, so we can ensure the sort could finish
+            // given enough merging memory
+            &[
+    
"+---------------+--------------------------------------------------------------------------------------------------------+",
+    "| plan_type     | plan                                                    
                                               |",
+    
"+---------------+--------------------------------------------------------------------------------------------------------+",
+    "| logical_plan  | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST          
                                               |",
+    "|               |   TableScan: t projection=[a, b]                        
                                               |",
+    "| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]            
                                               |",
+    "|               |   MemoryExec: partitions=1, partition_sizes=[5], 
output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
+    "|               |                                                         
                                               |",
+    
"+---------------+--------------------------------------------------------------------------------------------------------+",
+            ]
+        );
+
+    let config = base_config
+        .clone()
+        // provide insufficient reserved space for merging,
+        // the sort will fail while trying to merge
+        .with_sort_spill_reservation_bytes(1024);
+
+    test.clone()
+        .with_expected_errors(vec![
+            "Resources exhausted: Failed to allocate additional",
+            "ExternalSorterMerge", // merging in sort fails
+        ])
+        .with_config(config)
+        .run()
+        .await;
+
+    let config = base_config
+        // reserve sufficient space up front for merge and this time,
+        // which will force the spills to happen with less buffered
+        // input and thus with enough to merge.
+        .with_sort_spill_reservation_bytes(2 * partition_size);
+
+    test.with_config(config).with_expected_success().run().await;
+}
+
 /// Run the query with the specified memory limit,
 /// and verifies the expected errors are returned
 #[derive(Clone, Debug)]
@@ -205,9 +318,17 @@ struct TestCase {
     memory_limit: usize,
     config: SessionConfig,
     scenario: Scenario,
+    /// How should the disk manager (that allows spilling) be
+    /// configured? Defaults to `Disabled`
+    disk_manager_config: DiskManagerConfig,
+    /// Expected explain plan, if non emptry
+    expected_plan: Vec<String>,
+    /// Is the plan expected to pass? Defaults to false
+    expected_success: bool,
 }
 
 impl TestCase {
+    // TODO remove expected errors and memory limits and query from constructor
     fn new<'a>(
         query: impl Into<String>,
         expected_errors: impl IntoIterator<Item = &'a str>,
@@ -222,21 +343,56 @@ impl TestCase {
             memory_limit,
             config: SessionConfig::new(),
             scenario: Scenario::AccessLog,
+            disk_manager_config: DiskManagerConfig::Disabled,
+            expected_plan: vec![],
+            expected_success: false,
         }
     }
 
+    /// Set a list of expected strings that must appear in any errors
+    fn with_expected_errors<'a>(
+        mut self,
+        expected_errors: impl IntoIterator<Item = &'a str>,
+    ) -> Self {
+        self.expected_errors =
+            expected_errors.into_iter().map(|s| s.to_string()).collect();
+        self
+    }
+
     /// Specify the configuration to use
     pub fn with_config(mut self, config: SessionConfig) -> Self {
         self.config = config;
         self
     }
 
+    /// Mark that the test expects the query to run successfully
+    pub fn with_expected_success(mut self) -> Self {
+        self.expected_success = true;
+        self
+    }
+
     /// Specify the scenario to run
     pub fn with_scenario(mut self, scenario: Scenario) -> Self {
         self.scenario = scenario;
         self
     }
 
+    /// Specify if the disk manager should be enabled. If true,
+    /// operators that support it can spill
+    pub fn with_disk_manager_config(
+        mut self,
+        disk_manager_config: DiskManagerConfig,
+    ) -> Self {
+        self.disk_manager_config = disk_manager_config;
+        self
+    }
+
+    /// Specify an expected plan to review
+    pub fn with_expected_plan(mut self, expected_plan: &[&str]) -> Self {
+        self.expected_plan = expected_plan.iter().map(|s| 
s.to_string()).collect();
+        self
+    }
+
     /// Run the test, panic'ing on error
     async fn run(self) {
         let Self {
@@ -245,33 +401,62 @@ impl TestCase {
             memory_limit,
             config,
             scenario,
+            disk_manager_config,
+            expected_plan,
+            expected_success,
         } = self;
 
         let table = scenario.table();
 
         let rt_config = RuntimeConfig::new()
             // do not allow spilling
-            .with_disk_manager(DiskManagerConfig::Disabled)
+            .with_disk_manager(disk_manager_config)
             .with_memory_limit(memory_limit, MEMORY_FRACTION);
 
         let runtime = RuntimeEnv::new(rt_config).unwrap();
 
         // Configure execution
-        let state = SessionState::with_config_rt(config, Arc::new(runtime))
-            .with_physical_optimizer_rules(scenario.rules());
+        let state = SessionState::with_config_rt(config, Arc::new(runtime));
+        let state = match scenario.rules() {
+            Some(rules) => state.with_physical_optimizer_rules(rules),
+            None => state,
+        };
 
         let ctx = SessionContext::with_state(state);
         ctx.register_table("t", table).expect("registering table");
 
         let df = ctx.sql(&query).await.expect("Planning query");
 
+        if !expected_plan.is_empty() {
+            let expected_plan: Vec<_> =
+                expected_plan.iter().map(|s| s.as_str()).collect();
+            let actual_plan = df
+                .clone()
+                .explain(false, false)
+                .unwrap()
+                .collect()
+                .await
+                .unwrap();
+            assert_batches_eq!(expected_plan, &actual_plan);
+        }
+
         match df.collect().await {
             Ok(_batches) => {
-                panic!("Unexpected success when running, expected memory limit 
failure")
+                if !expected_success {
+                    panic!(
+                        "Unexpected success when running, expected memory 
limit failure"
+                    )
+                }
             }
             Err(e) => {
-                for error_substring in expected_errors {
-                    assert_contains!(e.to_string(), error_substring);
+                if expected_success {
+                    panic!(
+                        "Unexpected failure when running, expected success but 
got: {e}"
+                    )
+                } else {
+                    for error_substring in expected_errors {
+                        assert_contains!(e.to_string(), error_substring);
+                    }
                 }
             }
         }
@@ -290,6 +475,9 @@ enum Scenario {
     /// 1000 rows of access log data with batches of 50 rows in a
     /// [`StreamingTable`]
     AccessLogStreaming,
+
+    /// N partitions of of sorted, dictionary encoded strings
+    DictionaryStrings(usize),
 }
 
 impl Scenario {
@@ -317,24 +505,53 @@ impl Scenario {
                 .with_infinite_table(true);
                 Arc::new(table)
             }
+            Self::DictionaryStrings(num_partitions) => {
+                use datafusion::physical_expr::expressions::col;
+                let batches: Vec<Vec<_>> = std::iter::repeat(dict_batches())
+                    .take(*num_partitions)
+                    .collect();
+
+                let schema = batches[0][0].schema();
+                let options = SortOptions {
+                    descending: false,
+                    nulls_first: false,
+                };
+                let sort_information = vec![
+                    PhysicalSortExpr {
+                        expr: col("a", &schema).unwrap(),
+                        options,
+                    },
+                    PhysicalSortExpr {
+                        expr: col("b", &schema).unwrap(),
+                        options,
+                    },
+                ];
+
+                let table = SortedTableProvider::new(batches, 
sort_information);
+                Arc::new(table)
+            }
         }
     }
 
-    /// return the optimizer rules to use
-    fn rules(&self) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
+    /// return specific physical optimizer rules to use
+    fn rules(&self) -> Option<Vec<Arc<dyn PhysicalOptimizerRule + Send + 
Sync>>> {
         match self {
             Self::AccessLog => {
                 // Disabling physical optimizer rules to avoid sorts /
                 // repartitions (since RepartitionExec / SortExec also
                 // has a memory budget which we'll likely hit first)
-                vec![]
+                Some(vec![])
             }
             Self::AccessLogStreaming => {
                 // Disable all physical optimizer rules except the
                 // JoinSelection rule to avoid sorts or repartition,
                 // as they also have memory budgets that may be hit
                 // first
-                vec![Arc::new(JoinSelection::new())]
+                Some(vec![Arc::new(JoinSelection::new())])
+            }
+            Self::DictionaryStrings(_) => {
+                // Use default rules
+                None
             }
         }
     }
@@ -347,6 +564,56 @@ fn access_log_batches() -> Vec<RecordBatch> {
         .collect()
 }
 
+static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();
+
+/// Returns 5 sorted string dictionary batches each with 50 rows with
+/// this schema.
+///
+/// a: Dictionary<Utf8, Int32>,
+/// b: Dictionary<Utf8, Int32>,
+fn dict_batches() -> Vec<RecordBatch> {
+    DICT_BATCHES.get_or_init(make_dict_batches).clone()
+}
+
+fn make_dict_batches() -> Vec<RecordBatch> {
+    let batch_size = 50;
+
+    let mut i = 0;
+    let gen = std::iter::from_fn(move || {
+        // create values like
+        // 0000000001
+        // 0000000002
+        // ...
+        // 0000000002
+
+        let values: Vec<_> = (i..i + batch_size).map(|x| 
format!("{x:010}")).collect();
+        //println!("values: \n{values:?}");
+        let array: DictionaryArray<Int32Type> =
+            values.iter().map(|s| s.as_str()).collect();
+        let array = Arc::new(array) as ArrayRef;
+        let batch =
+            RecordBatch::try_from_iter(vec![("a", array.clone()), ("b", 
array)]).unwrap();
+
+        i += batch_size;
+        Some(batch)
+    });
+
+    let num_batches = 5;
+
+    let batches: Vec<_> = gen.take(num_batches).collect();
+
+    batches.iter().enumerate().for_each(|(i, batch)| {
+        println!("Dict batch[{i}] size is: {}", batch_byte_size(batch));
+    });
+
+    batches
+}
+
+// How many bytes does the memory from dict_batches consume?
+fn batches_byte_size(batches: &[RecordBatch]) -> usize {
+    batches.iter().map(batch_byte_size).sum()
+}
+
 struct DummyStreamPartition {
     schema: SchemaRef,
     batches: Vec<RecordBatch>,
@@ -366,3 +633,53 @@ impl PartitionStream for DummyStreamPartition {
         ))
     }
 }
+
+///  Wrapper over a TableProvider that can provide ordering information
+struct SortedTableProvider {
+    schema: SchemaRef,
+    batches: Vec<Vec<RecordBatch>>,
+    sort_information: Vec<PhysicalSortExpr>,
+}
+
+impl SortedTableProvider {
+    fn new(
+        batches: Vec<Vec<RecordBatch>>,
+        sort_information: Vec<PhysicalSortExpr>,
+    ) -> Self {
+        let schema = batches[0][0].schema();
+        Self {
+            schema,
+            batches,
+            sort_information,
+        }
+    }
+}
+
+#[async_trait]
+impl TableProvider for SortedTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &SessionState,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mem_exec =
+            MemoryExec::try_new(&self.batches, self.schema(), 
projection.cloned())?
+                .with_sort_information(self.sort_information.clone());
+
+        Ok(Arc::new(mem_exec))
+    }
+}
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt 
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index fcb818d5fd..162e208201 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -153,6 +153,8 @@ datafusion.execution.parquet.pushdown_filters false
 datafusion.execution.parquet.reorder_filters false
 datafusion.execution.parquet.skip_metadata true
 datafusion.execution.planning_concurrency 13
+datafusion.execution.sort_in_place_threshold_bytes 1048576
+datafusion.execution.sort_spill_reservation_bytes 10485760
 datafusion.execution.target_partitions 7
 datafusion.execution.time_zone +00:00
 datafusion.explain.logical_plan_only false
diff --git a/datafusion/execution/src/config.rs 
b/datafusion/execution/src/config.rs
index c847849936..44fcc2ab49 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -287,6 +287,32 @@ impl SessionConfig {
         self.options.optimizer.enable_round_robin_repartition
     }
 
+    /// Set the size of [`sort_spill_reservation_bytes`] to control
+    /// memory pre-reservation
+    ///
+    /// [`sort_spill_reservation_bytes`]: 
datafusion_common::config::ExecutionOptions::sort_spill_reservation_bytes
+    pub fn with_sort_spill_reservation_bytes(
+        mut self,
+        sort_spill_reservation_bytes: usize,
+    ) -> Self {
+        self.options.execution.sort_spill_reservation_bytes =
+            sort_spill_reservation_bytes;
+        self
+    }
+
+    /// Set the size of [`sort_in_place_threshold_bytes`] to control
+    /// how sort does things.
+    ///
+    /// [`sort_in_place_threshold_bytes`]: 
datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
+    pub fn with_sort_in_place_threshold_bytes(
+        mut self,
+        sort_in_place_threshold_bytes: usize,
+    ) -> Self {
+        self.options.execution.sort_in_place_threshold_bytes =
+            sort_in_place_threshold_bytes;
+        self
+    }
+
     /// Convert configuration options to name-value pairs with values
     /// converted to strings.
     ///
diff --git a/datafusion/execution/src/disk_manager.rs 
b/datafusion/execution/src/disk_manager.rs
index 107c58fbe3..e8d2ed9cc0 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -102,6 +102,13 @@ impl DiskManager {
         }
     }
 
+    /// Return true if this disk manager supports creating temporary
+    /// files. If this returns false, any call to `create_tmp_file`
+    /// will error.
+    pub fn tmp_files_enabled(&self) -> bool {
+        self.local_dirs.lock().is_some()
+    }
+
     /// Return a temporary file from a randomized choice in the configured 
locations
     ///
     /// If the file can not be created for some reason, returns an
@@ -198,6 +205,7 @@ mod tests {
         );
 
         let dm = DiskManager::try_new(config)?;
+        assert!(dm.tmp_files_enabled());
         let actual = dm.create_tmp_file("Testing")?;
 
         // the file should be in one of the specified local directories
@@ -210,6 +218,7 @@ mod tests {
     fn test_disabled_disk_manager() {
         let config = DiskManagerConfig::Disabled;
         let manager = DiskManager::try_new(config).unwrap();
+        assert!(!manager.tmp_files_enabled());
         assert_eq!(
             manager.create_tmp_file("Testing").unwrap_err().to_string(),
             "Resources exhausted: Memory Exhausted while Testing (DiskManager 
is disabled)",
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index bff7cb4da0..63c9c064bc 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -57,6 +57,8 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.parquet.reorder_filters               | false      | If 
true, filter expressions evaluated during the parquet decoding operation will 
be reordered heuristically to minimize the cost of evaluation. If false, the 
filters are applied in the same order as written in the query                   
                                                                                
                                                                                
                   [...]
 | datafusion.execution.aggregate.scalar_update_factor        | 10         | 
Specifies the threshold for using `ScalarValue`s to update accumulators during 
high-cardinality aggregations for each input batch. The aggregation is 
considered high-cardinality if the number of affected groups is greater than or 
equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are 
utilized for updating accumulators, rather than the default batch-slice 
approach. This can lead to perform [...]
 | datafusion.execution.planning_concurrency                  | 0          | 
Fan-out during initial physical planning. This is mostly use to plan `UNION` 
children in parallel. Defaults to the number of CPU cores on the system         
                                                                                
                                                                                
                                                                                
                    [...]
+| datafusion.execution.sort_spill_reservation_bytes          | 10485760   | 
Specifies the reserved memory for each spillable sort operation to facilitate 
an in-memory merge. When a sort operation spills to disk, the in-memory data 
must be sorted and merged before being written to a file. This setting reserves 
a specific amount of memory for that in-memory sort/merge process. Note: This 
setting is irrelevant if the sort operation cannot spill (i.e., if there's no 
`DiskManager` configured) [...]
+| datafusion.execution.sort_in_place_threshold_bytes         | 1048576    | 
When sorting, below what size should data be concatenated and sorted in a 
single RecordBatch rather than sorted in batches and merged.                    
                                                                                
                                                                                
                                                                                
                       [...]
 | datafusion.optimizer.enable_round_robin_repartition        | true       | 
When set to true, the physical plan optimizer will try to add round robin 
repartitioning to increase parallelism to leverage more CPU cores               
                                                                                
                                                                                
                                                                                
                       [...]
 | datafusion.optimizer.filter_null_join_keys                 | false      | 
When set to true, the optimizer will insert filters before a join between a 
nullable and non-nullable column to filter out nulls on the nullable side. This 
filter can add additional overhead when the file format does not fully support 
predicate push down.                                                            
                                                                                
                      [...]
 | datafusion.optimizer.repartition_aggregations              | true       | 
Should DataFusion repartition data using the aggregate keys to execute 
aggregates in parallel using the provided `target_partitions` level             
                                                                                
                                                                                
                                                                                
                          [...]

Reply via email to