pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2620829492


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1151,26 +1198,18 @@ impl GroupedHashAggregateStream {
     /// Conduct a streaming merge sort between the batch and spilled data. 
Since the stream is fully
     /// sorted, set `self.group_ordering` to Full, then later we can read with 
[`EmitTo::First`].
     fn update_merged_stream(&mut self) -> Result<()> {
-        let Some(batch) = self.emit(EmitTo::All, true)? else {
-            return Ok(());
-        };
+        // Spill the last remaining rows (if any) to free up as much memory as 
possible.
+        // Since we're already spilling, we can be sure we're memory 
constrained.
+        // Creating an extra spill file won't make much of a difference.
+        self.spill()?;
+
         // clear up memory for streaming_merge
         self.clear_all();
         self.update_memory_reservation()?;
-        let mut streams: Vec<SendableRecordBatchStream> = vec![];

Review Comment:
   Yes, that's correct. By spilling the last batch the special handling is no 
longer required. Clean code wasn't really the main driver though. Accounting 
for `batch` in the memory reservation was.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to