pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2620821300
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1151,26 +1198,18 @@ impl GroupedHashAggregateStream {
/// Conduct a streaming merge sort between the batch and spilled data.
Since the stream is fully
/// sorted, set `self.group_ordering` to Full, then later we can read with
[`EmitTo::First`].
fn update_merged_stream(&mut self) -> Result<()> {
- let Some(batch) = self.emit(EmitTo::All, true)? else {
- return Ok(());
- };
+ // Spill the last remaining rows (if any) to free up as much memory as
possible.
+ // Since we're already spilling, we can be sure we're memory
constrained.
+ // Creating an extra spill file won't make much of a difference.
+ self.spill()?;
Review Comment:
I had my doubts about this one myself due to performance regression
concerns. On the other hand, the previous code was kind of cheating in the
sense that the last batch is held in memory, and unless I'm mistaken, is no
longer accounted for in the memory reservation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]