pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2620829492
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1151,26 +1198,18 @@ impl GroupedHashAggregateStream {
/// Conduct a streaming merge sort between the batch and spilled data.
Since the stream is fully
/// sorted, set `self.group_ordering` to Full, then later we can read with
[`EmitTo::First`].
fn update_merged_stream(&mut self) -> Result<()> {
- let Some(batch) = self.emit(EmitTo::All, true)? else {
- return Ok(());
- };
+ // Spill the last remaining rows (if any) to free up as much memory as
possible.
+ // Since we're already spilling, we can be sure we're memory
constrained.
+ // Creating an extra spill file won't make much of a difference.
+ self.spill()?;
+
// clear up memory for streaming_merge
self.clear_all();
self.update_memory_reservation()?;
- let mut streams: Vec<SendableRecordBatchStream> = vec![];
Review Comment:
Yes, that's correct. By spilling the last batch the special handling is no
longer required. Clean code wasn't really the main driver though. Accounting
for `batch` in the memory reservation was.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]