pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2630297031
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -550,26 +571,39 @@ impl GroupedHashAggregateStream {
.collect::<Vec<_>>()
.join(", ");
let name = format!("GroupedHashAggregateStream[{partition}]
({agg_fn_names})");
- let reservation = MemoryConsumer::new(name)
- .with_can_spill(true)
- .register(context.memory_pool());
let group_ordering = GroupOrdering::try_new(&agg.input_order_mode)?;
+ let oom_mode = match agg.mode {
+ // In partial aggregation mode, always prefer to emit incomplete
results early.
+ AggregateMode::Partial => OutOfMemoryMode::EmitEarly,
+ _ => match group_ordering {
+ // For non-partial aggregation modes, don't use spilling if
the input
+ // of fully sorted by the grouping expressions. Regular
emission of completed
+ // group values will handle memory pressure.
+ GroupOrdering::Full(_) => OutOfMemoryMode::ReportError,
+ // For unsorted or partially sorted inputs, use disk spilling
+ GroupOrdering::None | GroupOrdering::Partial(_) =>
OutOfMemoryMode::Spill,
+ },
+ };
Review Comment:
Yes, that's better indeed. The matrix of possible outcomes is easier to see
that way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]