rluvaton commented on code in PR #20314:
URL: https://github.com/apache/datafusion/pull/20314#discussion_r2841088503


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1110,17 +1129,85 @@ impl GroupedHashAggregateStream {
         let Some(emit) = self.emit(EmitTo::All, true)? else {
             return Ok(());
         };
-        let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
-
-        // Spill sorted state to disk
-        let spillfile = self
-            .spill_state
-            .spill_manager
-            .spill_record_batch_by_size_and_return_max_batch_memory(
-                &sorted,
-                "HashAggSpill",
-                self.batch_size,
-            )?;
+
+        // Free accumulated state now that data has been emitted into `emit`.
+        // This must happen before reserving sort memory so the pool has room.
+        // Use 0 to minimize allocated capacity and maximize memory available 
for sorting.
+        self.clear_shrink(0);
+        self.update_memory_reservation()?;
+
+        // Our first strategy is to simply sort the batch eagerly, this 
requires the most peak memory(about 2x of the batch),
+        // but will also mean we can immediately drop the unsorted batch and 
free its memory.
+        let sort_memory = get_reserved_bytes_for_record_batch(&emit)?;
+        let spillfile = match self.reservation.try_grow(sort_memory) {
+            Ok(_) => {
+                // Sort the batch into chunks and spill each chunk to disk. 
This ensures we don't have to hold the entire
+                // batch in memory until the end of the spill, and can both 
drop it and release memory pressure sooner,
+                // as spilling may take time and other things can happen in 
the background.
+                let sorted = sort_batch_chunked(
+                    &emit,
+                    &self.spill_state.spill_expr,
+                    self.batch_size,
+                )?;
+                drop(emit);
+
+                let new_sort_memory: usize =
+                    sorted.iter().map(get_record_batch_memory_size).sum();
+                let mem_to_free = sort_memory.saturating_sub(new_sort_memory);
+                self.reservation.shrink(mem_to_free); // Mem pool should now 
only hold the memory for the sorted batches.
+
+                // Spill sorted state to disk
+                let spillfile = self
+                    .spill_state
+                    .spill_manager
+                    .spill_record_batch_iter_and_return_max_batch_memory(
+                        sorted.into_iter().map(Ok),
+                        "HashAggSpill",
+                    )?;
+
+                // Shrink the remaining memory we allocated
+                self.reservation
+                    .shrink(sort_memory.saturating_sub(mem_to_free));
+
+                spillfile
+            }
+            Err(_) => {
+                // However, if we don't have that peak memory, we can fallback 
to sorting lazily.
+                // This means we hold the original batch for longer, but we 
only require reserving the original batch's size,
+                // plus a fraction of it for each batch as we emit it and 
write it to file.
+                // this is still 2x the batch memory at the *worst case*, but 
the larger the batch, the smaller the fraction.
+                let batch_size_ratio = self.batch_size as f32 / 
emit.num_rows() as f32;
+                let batch_memory = get_record_batch_memory_size(&emit);
+                let sort_memory =
+                    batch_memory + (batch_memory as f32 * batch_size_ratio) as 
usize;
+
+                // If we can't grow even that, we have no choice but to return 
an error since we can't spill to disk without sorting the data first.
+                self.reservation.try_grow(sort_memory).map_err(|err| {
+                    resources_datafusion_err!(
+                        "Failed to reserve memory for sort during spill: {err}"
+                    )
+                })?;
+
+                let sorted_iter = IncrementalSortIterator::new(
+                    emit,
+                    self.spill_state.spill_expr.clone(),
+                    self.batch_size,
+                );
+                let spillfile = self
+                    .spill_state
+                    .spill_manager
+                    .spill_record_batch_iter_and_return_max_batch_memory(
+                        sorted_iter,
+                        "HashAggSpill",
+                    )?;
+
+                // Shrink the memory we allocated for sorting as the sorting 
is fully done at this point.
+                self.reservation.shrink(sort_memory);
+
+                spillfile
+            }
+        };

Review Comment:
   `sort_batch_chunked` seems to be the same as `IncrementalSortIterator`  but 
only do collect in the end.
   I would only have `IncrementalSortIterator` as I don't see a benefit of 
having the `sort_batch_chunked` branch and it will simplify the code  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to