This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 9797095e15 [branch-52] perf: sort replace free()->try_grow() pattern 
with try_resize() to reduce memory pool interactions (#20732)
9797095e15 is described below

commit 9797095e152749721bec07c0944fe664acaa0849
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Mar 5 17:04:12 2026 -0500

    [branch-52] perf: sort replace free()->try_grow() pattern with try_resize() 
to reduce memory pool interactions (#20732)
    
    Backport #20729 to `branch-52`.
---
 datafusion/physical-plan/src/sorts/sort.rs | 50 ++++++++++++------------------
 1 file changed, 20 insertions(+), 30 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 3e8fdf1f3e..475738cca3 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -728,37 +728,27 @@ impl ExternalSorter {
             let sorted_batches = sort_batch_chunked(&batch, &expressions, 
batch_size)?;
             drop(batch);
 
-            // Free the old reservation and grow it to match the actual sorted 
output size
-            reservation.free();
+            // Resize the reservation to match the actual sorted output size.
+            // Using try_resize avoids a release-then-reacquire cycle, which
+            // matters for MemoryPool implementations where grow/shrink have
+            // non-trivial cost (e.g. JNI calls in Comet).
+            let total_sorted_size: usize = sorted_batches
+                .iter()
+                .map(get_record_batch_memory_size)
+                .sum();
+            reservation
+                .try_resize(total_sorted_size)
+                .map_err(Self::err_with_oom_context)?;
 
-            Result::<_, DataFusionError>::Ok((schema, sorted_batches, 
reservation))
-        })
-        .then({
-            move |batches| async move {
-                match batches {
-                    Ok((schema, sorted_batches, mut reservation)) => {
-                        // Calculate the total size of sorted batches
-                        let total_sorted_size: usize = sorted_batches
-                            .iter()
-                            .map(get_record_batch_memory_size)
-                            .sum();
-                        reservation
-                            .try_grow(total_sorted_size)
-                            .map_err(Self::err_with_oom_context)?;
-
-                        // Wrap in ReservationStream to hold the reservation
-                        Ok(Box::pin(ReservationStream::new(
-                            Arc::clone(&schema),
-                            Box::pin(RecordBatchStreamAdapter::new(
-                                schema,
-                                
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
-                            )),
-                            reservation,
-                        )) as SendableRecordBatchStream)
-                    }
-                    Err(e) => Err(e),
-                }
-            }
+            // Wrap in ReservationStream to hold the reservation
+            Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
+                Arc::clone(&schema),
+                Box::pin(RecordBatchStreamAdapter::new(
+                    schema,
+                    futures::stream::iter(sorted_batches.into_iter().map(Ok)),
+                )),
+                reservation,
+            )) as SendableRecordBatchStream)
         })
         .try_flatten()
         .map(move |batch| match batch {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to