viirya commented on code in PR #7400:
URL: https://github.com/apache/arrow-datafusion/pull/7400#discussion_r1326538408
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -466,15 +625,122 @@ impl GroupedHashAggregateStream {
for acc in self.accumulators.iter_mut() {
match self.mode {
AggregateMode::Partial => output.extend(acc.state(emit_to)?),
+ _ if spilling => {
+ // If spilling, output partial state because the spilled
data will be
+ // merged and re-evaluated later.
+ output.extend(acc.state(emit_to)?)
+ }
AggregateMode::Final
| AggregateMode::FinalPartitioned
| AggregateMode::Single
| AggregateMode::SinglePartitioned =>
output.push(acc.evaluate(emit_to)?),
}
}
- self.update_memory_reservation()?;
- let batch = RecordBatch::try_new(self.schema(), output)?;
+ // emit reduces the memory usage. Ignore Err from
update_memory_reservation. Even if it is
+ // over the target memory size after emission, we can emit again
rather than returning Err.
+ let _ = self.update_memory_reservation();
+ let batch = RecordBatch::try_new(schema, output)?;
Ok(batch)
}
+
+ /// Optimistically, [`Self::group_aggregate_batch`] allows to exceed the
memory target slightly
+ /// (~ 1 [`RecordBatch`]) for simplicity. In such cases, spill the data to
disk and clear the
+ /// memory. Currently only [`GroupOrdering::None`] is supported for
spilling.
+ fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) ->
Result<()> {
+ // TODO: support group_ordering for spilling
+ if self.group_values.len() > 0
+ && batch.num_rows() > 0
+ && matches!(self.group_ordering, GroupOrdering::None)
+ && !matches!(self.mode, AggregateMode::Partial)
+ && !self.spill_state.is_stream_merging
+ && self.update_memory_reservation().is_err()
+ {
+ // Use input batch (Partial mode) schema for spilling because
+ // the spilled data will be merged and re-evaluated later.
+ self.spill_state.spill_schema = batch.schema();
+ self.spill()?;
+ self.clear_shrink(batch);
+ }
+ Ok(())
+ }
+
+ /// Emit all rows, sort them, and store them on disk.
+ fn spill(&mut self) -> Result<()> {
+ let emit = self.emit(EmitTo::All, true)?;
+ let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;
Review Comment:
Hmm, firstly the spilled data is partial aggregation states, right? This
tries to do is to spill them and read back later after all inputs are done (all
partial aggregation states are aggregated or spilled). The merged stream of
spilled batches are read in as new inputs, so they are continuously aggregated
into final aggregation result.
As the partial aggregation inputs should be unordered (this is not
sort-based aggregation but hash aggregation), that is I wonder why we need to
sort them before spilling. It doesn't matter for rows with `a = 2` in which
spilled batches. Once these rows are read, they are aggregated as expected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]