pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2634068010
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1115,62 +1156,30 @@ impl GroupedHashAggregateStream {
}
/// Clear memory and shirk capacities to the size of the batch.
- fn clear_shrink(&mut self, batch: &RecordBatch) {
- self.group_values.clear_shrink(batch);
+ fn clear_shrink(&mut self, num_rows: usize) {
+ self.group_values.clear_shrink(num_rows);
self.current_group_indices.clear();
- self.current_group_indices.shrink_to(batch.num_rows());
+ self.current_group_indices.shrink_to(num_rows);
}
/// Clear memory and shirk capacities to zero.
fn clear_all(&mut self) {
- let s = self.schema();
- self.clear_shrink(&RecordBatch::new_empty(s));
- }
-
- /// Emit if the used memory exceeds the target for partial aggregation.
- /// Currently only [`GroupOrdering::None`] is supported for early emitting.
- /// TODO: support group_ordering for early emitting
- ///
- /// Returns `Some(ExecutionState)` if the state should be changed, None
otherwise.
- fn emit_early_if_necessary(&mut self) -> Result<Option<ExecutionState>> {
- if self.group_values.len() >= self.batch_size
- && matches!(self.group_ordering, GroupOrdering::None)
- && self.update_memory_reservation().is_err()
- {
- assert_eq!(self.mode, AggregateMode::Partial);
- let n = self.group_values.len() / self.batch_size *
self.batch_size;
- if let Some(batch) = self.emit(EmitTo::First(n), false)? {
- return Ok(Some(ExecutionState::ProducingOutput(batch)));
- };
- }
- Ok(None)
+ self.clear_shrink(0);
}
/// At this point, all the inputs are read and there are some spills.
/// Emit the remaining rows and create a batch.
/// Conduct a streaming merge sort between the batch and spilled data.
Since the stream is fully
/// sorted, set `self.group_ordering` to Full, then later we can read with
[`EmitTo::First`].
Review Comment:
I ended up decided to inline `update_merged_stream` into
`set_input_done_and_produce_output`. I couldn't come up with a sensible name
for it, and found it to be clearer if the two alternatives in
`set_input_done_and_produce_output` are close together.
I've added additional comments in `set_input_done_and_produce_output` to
explain what's being done there and why.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]