kosiew commented on code in PR #20642:
URL: https://github.com/apache/datafusion/pull/20642#discussion_r2917246357


##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -253,7 +253,12 @@ impl MultiLevelMergeBuilder {
 
             // Need to merge multiple streams
             (_, _) => {
-                let mut memory_reservation = self.reservation.new_empty();
+                // Transfer any pre-reserved bytes (from 
sort_spill_reservation_bytes)
+                // to the merge memory reservation. This prevents starvation 
when
+                // concurrent sort partitions compete for pool memory: the 
pre-reserved
+                // bytes cover spill file buffer reservations without 
additional pool
+                // allocation.
+                let mut memory_reservation = self.reservation.take();

Review Comment:
   It looks like merge_sorted_runs_within_mem_limit() is  transferring 
self.reservation into memory_reservation before it actually knows whether any 
spill files will be merged. If the builder already has enough in-memory streams 
to satisfy minimum_number_of_required_streams, but the first spill file still 
cannot fit, then get_sorted_spill_files_to_merge() could legitimately return 
zero spill files.
   
   In that situation, is_only_merging_memory_streams would become true, but 
memory_reservation would still contain the bytes taken from self.reservation. 
That seems like it could trigger the assertion at lines 297–302 even though 
falling back to an all-in-memory merge is valid.
   
   My understanding is that this creates a behavior regression in the mixed 
{sorted_streams + sorted_spill_files} path. Should the reservation transfer 
instead happen only after at least one spill file is selected, or should the 
unused reservation be returned to the all-in-memory merge path rather than 
being asserted away? 🤔



##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -59,19 +74,29 @@ impl BatchBuilder {
         batch_size: usize,
         reservation: MemoryReservation,
     ) -> Self {
+        let initial_reservation = reservation.size();
         Self {
             schema,
             batches: Vec::with_capacity(stream_count * 2),
             cursors: vec![BatchCursor::default(); stream_count],
             indices: Vec::with_capacity(batch_size),
             reservation,
+            batches_mem_used: 0,
+            initial_reservation,
         }
     }
 
     /// Append a new batch in `stream_idx`
     pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> 
Result<()> {
-        self.reservation
-            .try_grow(get_record_batch_memory_size(&batch))?;
+        let size = get_record_batch_memory_size(&batch);
+        self.batches_mem_used += size;
+        // Only request additional memory from the pool when actual batch
+        // usage exceeds the current reservation (which may include
+        // pre-reserved bytes from sort_spill_reservation_bytes).
+        if self.batches_mem_used > self.reservation.size() {
+            self.reservation
+                .try_grow(self.batches_mem_used - self.reservation.size())?;

Review Comment:
   This “grow only when usage exceeds current reservation” pattern is also 
checked at get_sorted_spill_files_to_merge in multi_level_merge.rs. 
   I think extracting this into a helper will make the intended invariant 
easier to check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to