yjshen commented on code in PR #7130:
URL: https://github.com/apache/arrow-datafusion/pull/7130#discussion_r1287846296


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -455,26 +482,27 @@ impl ExternalSorter {
         assert_ne!(self.in_mem_batches.len(), 0);
         if self.in_mem_batches.len() == 1 {
             let batch = self.in_mem_batches.remove(0);
-            let stream = self.sort_batch_stream(batch, metrics)?;
-            self.in_mem_batches.clear();
-            return Ok(stream);
+            let reservation = self.reservation.take();
+            return self.sort_batch_stream(batch, metrics, reservation);
         }
 
-        // If less than 1MB of in-memory data, concatenate and sort in place
-        //
-        // This is a very rough heuristic and likely could be refined further
-        if self.reservation.size() < 1048576 {
+        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
+        if self.reservation.size() < self.sort_in_place_threshold_bytes {
             // Concatenate memory batches together and sort
             let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
             self.in_mem_batches.clear();
-            return self.sort_batch_stream(batch, metrics);
+            self.reservation.try_resize(batch.get_array_memory_size())?;
+            let reservation = self.reservation.take();
+            return self.sort_batch_stream(batch, metrics, reservation);
         }
 
         let streams = std::mem::take(&mut self.in_mem_batches)
             .into_iter()
             .map(|batch| {
                 let metrics = self.metrics.baseline.intermediate();
-                Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1))
+                let reservation = 
self.reservation.split(batch.get_array_memory_size());

Review Comment:
   👍 



##########
datafusion/common/src/config.rs:
##########
@@ -235,6 +235,23 @@ config_namespace! {
         ///
         /// Defaults to the number of CPU cores on the system
         pub planning_concurrency: usize, default = num_cpus::get()
+
+        /// How much memory is set aside, for each spillable sort, to
+        /// ensure an in-memory merge can occur. This setting has no
+        /// if the sort can not spill (there is no `DiskManager`
+        /// configured)
+        ///
+        /// As part of spilling to disk, in memory data must be sorted
+        /// / merged before writing the file. This in-memory
+        /// sort/merge requires memory as well, so To avoid allocating
+        /// once memory is exhausted, DataFusion sets aside this
+        /// many bytes before.

Review Comment:
   Maybe:
   
   ```rs
   /// Specifies the reserved memory for each spillable sort operation to
   /// facilitate an in-memory merge.
   ///
   /// When a sort operation spills to disk, the in-memory data must be
   /// sorted and merged before being written to a file. This setting reserves
   /// a specific amount of memory for that in-memory sort/merge process.
   ///
   /// Note: This setting is irrelevant if the sort operation cannot spill
   /// (i.e., if there's no `DiskManager` configured).
   ```



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -455,26 +482,27 @@ impl ExternalSorter {
         assert_ne!(self.in_mem_batches.len(), 0);
         if self.in_mem_batches.len() == 1 {
             let batch = self.in_mem_batches.remove(0);
-            let stream = self.sort_batch_stream(batch, metrics)?;
-            self.in_mem_batches.clear();
-            return Ok(stream);
+            let reservation = self.reservation.take();
+            return self.sort_batch_stream(batch, metrics, reservation);
         }
 
-        // If less than 1MB of in-memory data, concatenate and sort in place
-        //
-        // This is a very rough heuristic and likely could be refined further
-        if self.reservation.size() < 1048576 {
+        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
+        if self.reservation.size() < self.sort_in_place_threshold_bytes {
             // Concatenate memory batches together and sort
             let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
             self.in_mem_batches.clear();
-            return self.sort_batch_stream(batch, metrics);
+            self.reservation.try_resize(batch.get_array_memory_size())?;
+            let reservation = self.reservation.take();
+            return self.sort_batch_stream(batch, metrics, reservation);
         }
 
         let streams = std::mem::take(&mut self.in_mem_batches)
             .into_iter()
             .map(|batch| {
                 let metrics = self.metrics.baseline.intermediate();
-                Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1))
+                let reservation = 
self.reservation.split(batch.get_array_memory_size());

Review Comment:
   👍 



##########
datafusion/core/tests/memory_limit.rs:
##########
@@ -17,12 +17,21 @@
 
 //! This module contains tests for limiting memory at runtime in DataFusion
 
-use arrow::datatypes::SchemaRef;
+use arrow::datatypes::{Int32Type, SchemaRef};
 use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, DictionaryArray};
+use arrow_schema::SortOptions;
+use async_trait::async_trait;
+use datafusion::assert_batches_eq;
 use datafusion::physical_optimizer::PhysicalOptimizerRule;
+use datafusion::physical_plan::common::batch_byte_size;

Review Comment:
   We would probably remove this method and use 
`RecordBatch::get_array_memory_size` in the repo.



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -374,6 +393,11 @@ impl ExternalSorter {
             return Ok(());
         }
 
+        // Release the memory reserved for merge back to the pool so
+        // there is some left when `in_memo_sort_stream` requests an
+        // allocation.
+        self.merge_reservation.free();

Review Comment:
   :+1:



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -210,23 +210,37 @@ struct ExternalSorter {
     /// If Some, the maximum number of output rows that will be
     /// produced.
     fetch: Option<usize>,
-    /// Memory usage tracking
+    /// Reservation for in_mem_batches
     reservation: MemoryReservation,
-    /// The partition id that this Sort is handling (for identification)
-    partition_id: usize,
-    /// A handle to the runtime to get Disk spill files
+    /// Reservation for the merging of in-memory batches. If the sort
+    /// might spill, `sort_spill_reservation_bytes` will be
+    /// pre-reserved to ensure there is some space for this sort/merg.

Review Comment:
   ```suggestion
       /// pre-reserved to ensure there is some space for this sort/merge.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to