This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 9797095e15 [branch-52] perf: sort replace free()->try_grow() pattern
with try_resize() to reduce memory pool interactions (#20732)
9797095e15 is described below
commit 9797095e152749721bec07c0944fe664acaa0849
Author: Matt Butrovich <[email protected]>
AuthorDate: Thu Mar 5 17:04:12 2026 -0500
[branch-52] perf: sort replace free()->try_grow() pattern with try_resize()
to reduce memory pool interactions (#20732)
Backport #20729 to `branch-52`.
---
datafusion/physical-plan/src/sorts/sort.rs | 50 ++++++++++++------------------
1 file changed, 20 insertions(+), 30 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 3e8fdf1f3e..475738cca3 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -728,37 +728,27 @@ impl ExternalSorter {
let sorted_batches = sort_batch_chunked(&batch, &expressions,
batch_size)?;
drop(batch);
- // Free the old reservation and grow it to match the actual sorted
output size
- reservation.free();
+ // Resize the reservation to match the actual sorted output size.
+ // Using try_resize avoids a release-then-reacquire cycle, which
+ // matters for MemoryPool implementations where grow/shrink have
+ // non-trivial cost (e.g. JNI calls in Comet).
+ let total_sorted_size: usize = sorted_batches
+ .iter()
+ .map(get_record_batch_memory_size)
+ .sum();
+ reservation
+ .try_resize(total_sorted_size)
+ .map_err(Self::err_with_oom_context)?;
- Result::<_, DataFusionError>::Ok((schema, sorted_batches,
reservation))
- })
- .then({
- move |batches| async move {
- match batches {
- Ok((schema, sorted_batches, mut reservation)) => {
- // Calculate the total size of sorted batches
- let total_sorted_size: usize = sorted_batches
- .iter()
- .map(get_record_batch_memory_size)
- .sum();
- reservation
- .try_grow(total_sorted_size)
- .map_err(Self::err_with_oom_context)?;
-
- // Wrap in ReservationStream to hold the reservation
- Ok(Box::pin(ReservationStream::new(
- Arc::clone(&schema),
- Box::pin(RecordBatchStreamAdapter::new(
- schema,
-
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
- )),
- reservation,
- )) as SendableRecordBatchStream)
- }
- Err(e) => Err(e),
- }
- }
+ // Wrap in ReservationStream to hold the reservation
+ Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
+ Arc::clone(&schema),
+ Box::pin(RecordBatchStreamAdapter::new(
+ schema,
+ futures::stream::iter(sorted_batches.into_iter().map(Ok)),
+ )),
+ reservation,
+ )) as SendableRecordBatchStream)
})
.try_flatten()
.map(move |batch| match batch {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]