pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2635485288
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1115,62 +1156,30 @@ impl GroupedHashAggregateStream {
}
/// Clear memory and shirk capacities to the size of the batch.
- fn clear_shrink(&mut self, batch: &RecordBatch) {
- self.group_values.clear_shrink(batch);
+ fn clear_shrink(&mut self, num_rows: usize) {
+ self.group_values.clear_shrink(num_rows);
self.current_group_indices.clear();
- self.current_group_indices.shrink_to(batch.num_rows());
+ self.current_group_indices.shrink_to(num_rows);
}
/// Clear memory and shirk capacities to zero.
fn clear_all(&mut self) {
- let s = self.schema();
- self.clear_shrink(&RecordBatch::new_empty(s));
- }
-
- /// Emit if the used memory exceeds the target for partial aggregation.
- /// Currently only [`GroupOrdering::None`] is supported for early emitting.
- /// TODO: support group_ordering for early emitting
- ///
- /// Returns `Some(ExecutionState)` if the state should be changed, None
otherwise.
- fn emit_early_if_necessary(&mut self) -> Result<Option<ExecutionState>> {
- if self.group_values.len() >= self.batch_size
- && matches!(self.group_ordering, GroupOrdering::None)
- && self.update_memory_reservation().is_err()
- {
- assert_eq!(self.mode, AggregateMode::Partial);
- let n = self.group_values.len() / self.batch_size *
self.batch_size;
- if let Some(batch) = self.emit(EmitTo::First(n), false)? {
- return Ok(Some(ExecutionState::ProducingOutput(batch)));
- };
- }
- Ok(None)
+ self.clear_shrink(0);
}
/// At this point, all the inputs are read and there are some spills.
/// Emit the remaining rows and create a batch.
/// Conduct a streaming merge sort between the batch and spilled data.
Since the stream is fully
/// sorted, set `self.group_ordering` to Full, then later we can read with
[`EmitTo::First`].
Review Comment:
In `external_aggr.rs`, the query plan for Q1 is
```
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
ProjectionExec: expr=[]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as
l_orderkey, l_suppkey@1 as l_suppkey], aggr=[]
RepartitionExec: partitioning=Hash([l_orderkey@0, l_suppkey@1],
4), input_partitions=4
AggregateExec: mode=Partial, gby=[l_orderkey@0 as l_orderkey,
l_suppkey@1 as l_suppkey], aggr=[]
DataSourceExec: ...
```
I've confirmed with the debugger that a mix of `OutOfMemoryMode::EmitEarly`
and `OutOfMemoryMode::Spill` is used. So that benchmark is definitely useful to
measure the performance impact of changes in this area of the code.
@alamb Is it worth trying to get it in working state again on `main` with
only minimal changes first? My appetite for that work is rather limited unless
it's considered a blocker for this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]