rluvaton commented on code in PR #20314:
URL: https://github.com/apache/datafusion/pull/20314#discussion_r2841088503
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1110,17 +1129,85 @@ impl GroupedHashAggregateStream {
let Some(emit) = self.emit(EmitTo::All, true)? else {
return Ok(());
};
- let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
-
- // Spill sorted state to disk
- let spillfile = self
- .spill_state
- .spill_manager
- .spill_record_batch_by_size_and_return_max_batch_memory(
- &sorted,
- "HashAggSpill",
- self.batch_size,
- )?;
+
+ // Free accumulated state now that data has been emitted into `emit`.
+ // This must happen before reserving sort memory so the pool has room.
+ // Use 0 to minimize allocated capacity and maximize memory available
for sorting.
+ self.clear_shrink(0);
+ self.update_memory_reservation()?;
+
+ // Our first strategy is to simply sort the batch eagerly, this
requires the most peak memory(about 2x of the batch),
+ // but will also mean we can immediately drop the unsorted batch and
free its memory.
+ let sort_memory = get_reserved_bytes_for_record_batch(&emit)?;
+ let spillfile = match self.reservation.try_grow(sort_memory) {
+ Ok(_) => {
+ // Sort the batch into chunks and spill each chunk to disk.
This ensures we don't have to hold the entire
+ // batch in memory until the end of the spill, and can both
drop it and release memory pressure sooner,
+ // as spilling may take time and other things can happen in
the background.
+ let sorted = sort_batch_chunked(
+ &emit,
+ &self.spill_state.spill_expr,
+ self.batch_size,
+ )?;
+ drop(emit);
+
+ let new_sort_memory: usize =
+ sorted.iter().map(get_record_batch_memory_size).sum();
+ let mem_to_free = sort_memory.saturating_sub(new_sort_memory);
+ self.reservation.shrink(mem_to_free); // Mem pool should now
only hold the memory for the sorted batches.
+
+ // Spill sorted state to disk
+ let spillfile = self
+ .spill_state
+ .spill_manager
+ .spill_record_batch_iter_and_return_max_batch_memory(
+ sorted.into_iter().map(Ok),
+ "HashAggSpill",
+ )?;
+
+ // Shrink the remaining memory we allocated
+ self.reservation
+ .shrink(sort_memory.saturating_sub(mem_to_free));
+
+ spillfile
+ }
+ Err(_) => {
+ // However, if we don't have that peak memory, we can fallback
to sorting lazily.
+ // This means we hold the original batch for longer, but we
only require reserving the original batch's size,
+ // plus a fraction of it for each batch as we emit it and
write it to file.
+ // this is still 2x the batch memory at the *worst case*, but
the larger the batch, the smaller the fraction.
+ let batch_size_ratio = self.batch_size as f32 /
emit.num_rows() as f32;
+ let batch_memory = get_record_batch_memory_size(&emit);
+ let sort_memory =
+ batch_memory + (batch_memory as f32 * batch_size_ratio) as
usize;
+
+ // If we can't grow even that, we have no choice but to return
an error since we can't spill to disk without sorting the data first.
+ self.reservation.try_grow(sort_memory).map_err(|err| {
+ resources_datafusion_err!(
+ "Failed to reserve memory for sort during spill: {err}"
+ )
+ })?;
+
+ let sorted_iter = IncrementalSortIterator::new(
+ emit,
+ self.spill_state.spill_expr.clone(),
+ self.batch_size,
+ );
+ let spillfile = self
+ .spill_state
+ .spill_manager
+ .spill_record_batch_iter_and_return_max_batch_memory(
+ sorted_iter,
+ "HashAggSpill",
+ )?;
+
+ // Shrink the memory we allocated for sorting as the sorting
is fully done at this point.
+ self.reservation.shrink(sort_memory);
+
+ spillfile
+ }
+ };
Review Comment:
`sort_batch_chunked` seems to be the same as `IncrementalSortIterator` but
only do collect in the end.
I would only have `IncrementalSortIterator` as I don't see a benefit of
having the `sort_batch_chunked` branch and it will simplify the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]