kosiew commented on code in PR #20642:
URL: https://github.com/apache/datafusion/pull/20642#discussion_r2917246357
##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -253,7 +253,12 @@ impl MultiLevelMergeBuilder {
// Need to merge multiple streams
(_, _) => {
- let mut memory_reservation = self.reservation.new_empty();
+ // Transfer any pre-reserved bytes (from
sort_spill_reservation_bytes)
+ // to the merge memory reservation. This prevents starvation
when
+ // concurrent sort partitions compete for pool memory: the
pre-reserved
+ // bytes cover spill file buffer reservations without
additional pool
+ // allocation.
+ let mut memory_reservation = self.reservation.take();
Review Comment:
It looks like merge_sorted_runs_within_mem_limit() is transferring
self.reservation into memory_reservation before it actually knows whether any
spill files will be merged. If the builder already has enough in-memory streams
to satisfy minimum_number_of_required_streams, but the first spill file still
cannot fit, then get_sorted_spill_files_to_merge() could legitimately return
zero spill files.
In that situation, is_only_merging_memory_streams would become true, but
memory_reservation would still contain the bytes taken from self.reservation.
That seems like it could trigger the assertion at lines 297–302 even though
falling back to an all-in-memory merge is valid.
My understanding is that this creates a behavior regression in the mixed
{sorted_streams + sorted_spill_files} path. Should the reservation transfer
instead happen only after at least one spill file is selected, or should the
unused reservation be returned to the all-in-memory merge path rather than
being asserted away? 🤔
##########
datafusion/physical-plan/src/sorts/builder.rs:
##########
@@ -59,19 +74,29 @@ impl BatchBuilder {
batch_size: usize,
reservation: MemoryReservation,
) -> Self {
+ let initial_reservation = reservation.size();
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
reservation,
+ batches_mem_used: 0,
+ initial_reservation,
}
}
/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) ->
Result<()> {
- self.reservation
- .try_grow(get_record_batch_memory_size(&batch))?;
+ let size = get_record_batch_memory_size(&batch);
+ self.batches_mem_used += size;
+ // Only request additional memory from the pool when actual batch
+ // usage exceeds the current reservation (which may include
+ // pre-reserved bytes from sort_spill_reservation_bytes).
+ if self.batches_mem_used > self.reservation.size() {
+ self.reservation
+ .try_grow(self.batches_mem_used - self.reservation.size())?;
Review Comment:
This “grow only when usage exceeds current reservation” pattern is also
checked at get_sorted_spill_files_to_merge in multi_level_merge.rs.
I think extracting this into a helper will make the intended invariant
easier to check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]