This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new 35749607f5 [branch-53] perf: sort replace free()->try_grow() pattern
with try_resize() to reduce memory pool interactions (#20733)
35749607f5 is described below
commit 35749607f585b3bf25b66b7d2289c56c18d03e4f
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Mar 5 23:05:19 2026 -0500
[branch-53] perf: sort replace free()->try_grow() pattern with try_resize()
to reduce memory pool interactions (#20733)
Backport #20729 to `branch-53`.
---
datafusion/physical-plan/src/sorts/sort.rs | 50 ++++++++++++------------------
1 file changed, 20 insertions(+), 30 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index b3ea548d53..5b64f0b2a6 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -730,37 +730,27 @@ impl ExternalSorter {
// Sort the batch immediately and get all output batches
let sorted_batches = sort_batch_chunked(&batch, &expressions,
batch_size)?;
- // Free the old reservation and grow it to match the actual sorted
output size
- reservation.free();
+ // Resize the reservation to match the actual sorted output size.
+ // Using try_resize avoids a release-then-reacquire cycle, which
+ // matters for MemoryPool implementations where grow/shrink have
+ // non-trivial cost (e.g. JNI calls in Comet).
+ let total_sorted_size: usize = sorted_batches
+ .iter()
+ .map(get_record_batch_memory_size)
+ .sum();
+ reservation
+ .try_resize(total_sorted_size)
+ .map_err(Self::err_with_oom_context)?;
- Result::<_, DataFusionError>::Ok((schema, sorted_batches,
reservation))
- })
- .then({
- move |batches| async move {
- match batches {
- Ok((schema, sorted_batches, reservation)) => {
- // Calculate the total size of sorted batches
- let total_sorted_size: usize = sorted_batches
- .iter()
- .map(get_record_batch_memory_size)
- .sum();
- reservation
- .try_grow(total_sorted_size)
- .map_err(Self::err_with_oom_context)?;
-
- // Wrap in ReservationStream to hold the reservation
- Ok(Box::pin(ReservationStream::new(
- Arc::clone(&schema),
- Box::pin(RecordBatchStreamAdapter::new(
- schema,
-
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
- )),
- reservation,
- )) as SendableRecordBatchStream)
- }
- Err(e) => Err(e),
- }
- }
+ // Wrap in ReservationStream to hold the reservation
+ Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
+ Arc::clone(&schema),
+ Box::pin(RecordBatchStreamAdapter::new(
+ Arc::clone(&schema),
+ futures::stream::iter(sorted_batches.into_iter().map(Ok)),
+ )),
+ reservation,
+ )) as SendableRecordBatchStream)
})
.try_flatten()
.map(move |batch| match batch {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]