alamb commented on code in PR #20446:
URL: https://github.com/apache/datafusion/pull/20446#discussion_r2843283788
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1036,7 +1036,19 @@ impl GroupedHashAggregateStream {
self.group_values.len()
};
- if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+ // Clamp to the sort boundary when using partial group
ordering,
+ // otherwise remove_groups panics (#20445).
+ let n = match &self.group_ordering {
Review Comment:
I found this logic quite confusing and the use of `0` as a sentiel also
zero. Could we maybe encapsulate it into a method? That way we could at least
explain what it is doing better.
I tried a bunch of different forumations, and the best I could come up with
was
```rust
impl GroupedHashAggregateStream {
...
// Clamp to the sort boundary when using partial group ordering,
// otherwise remove_groups panics (#20445).
if let Some(emit_to) = self.emit_target_for_oom() {
if let Some(batch) = self.emit(EmitTo::First(n), false)?
{
return
Ok(Some(ExecutionState::ProducingOutput(batch)))
}
}
...
/// Returns how many groups to try and emit in order to avoid an
out-of-memory
/// condition.
///
/// Returns `None` if emitting is not possible.
///
/// Returns Some(EmitTo) with the number of groups to emit if it is
possible
/// to emit some groups to free memory
fn emit_target_for_oom(&self) -> Option<EmitTo> {
let n = if self.group_values.len() >= self.batch_size {
// Try to emit an integer multiple of batch size if possible
self.group_values.len() / self.batch_size * self.batch_size
} else {
// Otherwise emit whatever we can
self.group_values.len()
};
// Special case for GroupOrdering::None since emit_to() returns None
for
// that case, but we can still emit some groups to try to resolve
the OOM
if matches!(&self.group_ordering, GroupOrdering::None) {
return Some(EmitTo::First(n));
};
self.group_ordering.emit_to()
.map(|emit_to| match emit_to {
// If the ordering allows emitting some groups,
// emit as many as we can to try to resolve the OOM,
EmitTo::First(max)=> EmitTo::First(n.min(max)),
// if the ordering allows emitting all groups, we can emit n
// groups to try to resolve the OOM
EmitTo::All => EmitTo::First(n),
})
}
...
}
```
Here is the entire proposed diff
```diff
index 35f32ac7a..5bd33aab5 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -1038,26 +1038,50 @@ impl GroupedHashAggregateStream {
// Clamp to the sort boundary when using partial group
ordering,
// otherwise remove_groups panics (#20445).
- let n = match &self.group_ordering {
- GroupOrdering::None => n,
- _ => match self.group_ordering.emit_to() {
- Some(EmitTo::First(max)) => n.min(max),
- _ => 0,
- },
- };
-
- if n > 0
- && let Some(batch) = self.emit(EmitTo::First(n), false)?
- {
- Ok(Some(ExecutionState::ProducingOutput(batch)))
- } else {
- Err(oom)
+ if let Some(emit_to) = self.emit_target_for_oom() {
+ if let Some(batch) = self.emit(EmitTo::First(n),
false)? {
+ return
Ok(Some(ExecutionState::ProducingOutput(batch)));
+ }
}
+
+ Err(oom)
}
_ => Err(oom),
}
}
+ /// Returns how many groups to try and emit in order to avoid an
out-of-memory
+ /// condition.
+ ///
+ /// Returns `None` if emitting is not possible.
+ ///
+ /// Returns Some(EmitTo) with the number of groups to emit if it is
possible
+ /// to emit some groups to free memory
+ fn emit_target_for_oom(&self) -> Option<EmitTo> {
+ let n = if self.group_values.len() >= self.batch_size {
+ // Try to emit an integer multiple of batch size if possible
+ self.group_values.len() / self.batch_size * self.batch_size
+ } else {
+ // Otherwise emit whatever we can
+ self.group_values.len()
+ };
+
+ // Special case for GroupOrdering::None since emit_to() returns
None for
+ // that case, but we can still emit some groups to try to resolve
the OOM
+ if matches!(&self.group_ordering, GroupOrdering::None) {
+ return Some(EmitTo::First(n));
+ };
+
+ self.group_ordering.emit_to().map(|emit_to| match emit_to {
+ // If the ordering allows emitting some groups,
+ // emit as many as we can to try to resolve the OOM,
+ EmitTo::First(max) => EmitTo::First(n.min(max)),
+ // if the ordering allows emitting all groups, we can emit n
+ // groups to try to resolve the OOM
+ EmitTo::All => EmitTo::First(n),
+ })
+ }
+
```
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1036,7 +1036,19 @@ impl GroupedHashAggregateStream {
self.group_values.len()
};
- if let Some(batch) = self.emit(EmitTo::First(n), false)? {
+ // Clamp to the sort boundary when using partial group
ordering,
+ // otherwise remove_groups panics (#20445).
+ let n = match &self.group_ordering {
+ GroupOrdering::None => n,
+ _ => match self.group_ordering.emit_to() {
+ Some(EmitTo::First(max)) => n.min(max),
+ _ => 0,
Review Comment:
Why not change this to emit n groups on `Some(EmitTo::All)`?
```rust
_ => match self.group_ordering.emit_to() {
Some(EmitTo::First(max)) => n.min(max),
Some(EmitTo::All) => n,
_ => 0,
```
As I understand what it is doing, we are trying to ensure at most n groups
are emitted
(this is incorporated above as well)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]