xudong963 commented on code in PR #20642:
URL: https://github.com/apache/datafusion/pull/20642#discussion_r2930508930
##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -59,19 +74,29 @@ impl BatchBuilder {
batch_size: usize,
reservation: MemoryReservation,
) -> Self {
+ let initial_reservation = reservation.size();
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
reservation,
+ batches_mem_used: 0,
+ initial_reservation,
}
}
/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) ->
Result<()> {
- self.reservation
- .try_grow(get_record_batch_memory_size(&batch))?;
+ let size = get_record_batch_memory_size(&batch);
+ self.batches_mem_used += size;
+ // Only request additional memory from the pool when actual batch
+ // usage exceeds the current reservation (which may include
+ // pre-reserved bytes from sort_spill_reservation_bytes).
+ if self.batches_mem_used > self.reservation.size() {
+ self.reservation
+ .try_grow(self.batches_mem_used - self.reservation.size())?;
Review Comment:
https://github.com/apache/datafusion/pull/20642/changes/c478d435ba25b2f8b96cf3cb44d8edf8528a354f
##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -253,7 +253,12 @@ impl MultiLevelMergeBuilder {
// Need to merge multiple streams
(_, _) => {
- let mut memory_reservation = self.reservation.new_empty();
+ // Transfer any pre-reserved bytes (from
sort_spill_reservation_bytes)
+ // to the merge memory reservation. This prevents starvation
when
+ // concurrent sort partitions compete for pool memory: the
pre-reserved
+ // bytes cover spill file buffer reservations without
additional pool
+ // allocation.
+ let mut memory_reservation = self.reservation.take();
Review Comment:
https://github.com/apache/datafusion/pull/20642/changes/c478d435ba25b2f8b96cf3cb44d8edf8528a354f
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]