kosiew commented on code in PR #20559:
URL: https://github.com/apache/datafusion/pull/20559#discussion_r2925163741
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1028,36 +1028,57 @@ impl GroupedHashAggregateStream {
Ok(None)
}
OutOfMemoryMode::EmitEarly if self.group_values.len() > 1 => {
- let n = if self.group_values.len() >= self.batch_size {
- // Try to emit an integer multiple of batch size if
possible
- self.group_values.len() / self.batch_size * self.batch_size
- } else {
- // Otherwise emit whatever we can
- self.group_values.len()
- };
-
// Clamp to the sort boundary when using partial group
ordering,
// otherwise remove_groups panics (#20445).
- let n = match &self.group_ordering {
- GroupOrdering::None => n,
- _ => match self.group_ordering.emit_to() {
- Some(EmitTo::First(max)) => n.min(max),
- _ => 0,
- },
- };
-
- if n > 0
- && let Some(batch) = self.emit(EmitTo::First(n), false)?
+ if let Some(emit_to) = self.emit_target_for_oom()
+ && let Some(batch) = self.emit(emit_to, false)?
{
- Ok(Some(ExecutionState::ProducingOutput(batch)))
- } else {
- Err(oom)
+ return Ok(Some(ExecutionState::ProducingOutput(batch)));
}
+ Err(oom)
}
- _ => Err(oom),
+ OutOfMemoryMode::EmitEarly
+ | OutOfMemoryMode::Spill
+ | OutOfMemoryMode::ReportError => Err(oom),
}
}
+ /// Returns the number of groups groups that can be emitted to avoid an
+ /// out-of-memory condition.
+ ///
+ /// Returns `None` if emitting is not possible.
+ ///
+ /// Returns `Some(EmitTo)` with the number of groups to emit if it is
possible
+ /// to emit some groups to free memory.
+ fn emit_target_for_oom(&self) -> Option<EmitTo> {
Review Comment:
emit_target_for_oom is a nice extraction, but it mixes two different
concerns:
- the generic OOM‑batching policy (“how many rows would I like to emit?”) and
- the GroupOrdering rules (“what emit strategies are legal given the current
ordering?” – None has its own special case).
I think renaming/doc‑tweaking the helper (e.g. oom_emit_to / “returns the
emit strategy to use under OOM”) would clarify this.
Alternatively, move the ordering‑aware clamping logic onto GroupOrdering (or
into aggregates/order/mod.rs).
That way row_hash.rs just decides how much it wants to emit under memory
pressure, and the ordering module enforces its invariants – which keeps the
rules in one place and makes the helper reusable for other emit paths.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]