pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2625098061
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -550,26 +569,37 @@ impl GroupedHashAggregateStream {
.collect::<Vec<_>>()
.join(", ");
let name = format!("GroupedHashAggregateStream[{partition}]
({agg_fn_names})");
- let reservation = MemoryConsumer::new(name)
- .with_can_spill(true)
- .register(context.memory_pool());
let group_ordering = GroupOrdering::try_new(&agg.input_order_mode)?;
+ let oom_mode = match group_ordering {
+ GroupOrdering::None => {
Review Comment:
Group ordering none indeed means that the input is not ordered. The
consequence is that you need to have seen all input before you can emit an
aggregation value for a group.
The exception to this is partial aggregation, where the subsequent final
aggregation will merge everything together properly. If both those conditions
are satisfied (i.e. none + partial), then an OOM condition is allowed to
forcibly emit group values early despite possibly not having seen all values
for that group.
If the group ordering is either Partial or Full, then there is a separate
code path that already tries to emit completed groups even if there isn't an
OOM condition.
This is also why the mode is flipped to Full when merging the sorted spill
files. We know we're reading them back and merging them fully sorted so we can
make use of the emission behaviour of `GroupOrdering::Full` to push out group
values as soon as possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]