tustvold commented on code in PR #7025:
URL: https://github.com/apache/arrow-datafusion/pull/7025#discussion_r1268175366
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -342,69 +332,92 @@ impl Stream for GroupedHashAggregateStream {
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
loop {
- let exec_state = self.exec_state.clone();
- match exec_state {
+ let res = match self.exec_state.clone() {
ExecutionState::ReadingInput => {
- match ready!(self.input.poll_next_unpin(cx)) {
- // new batch to aggregate
- Some(Ok(batch)) => {
- let timer = elapsed_compute.timer();
- // Do the grouping
- extract_ok!(self.group_aggregate_batch(batch));
-
- // If we can begin emitting rows, do so,
- // otherwise keep consuming input
- assert!(!self.input_done);
- let to_emit = self.group_ordering.emit_to();
-
- if let Some(to_emit) = to_emit {
- let batch =
-
extract_ok!(self.create_batch_from_map(to_emit));
- self.exec_state =
ExecutionState::ProducingOutput(batch);
- }
- timer.done();
- }
- Some(Err(e)) => {
- // inner had error, return to caller
- return Poll::Ready(Some(Err(e)));
- }
- None => {
- // inner is done, emit all rows and switch to
producing output
- self.input_done = true;
- self.group_ordering.input_done();
- let timer = elapsed_compute.timer();
- let batch =
-
extract_ok!(self.create_batch_from_map(EmitTo::All));
- self.exec_state =
ExecutionState::ProducingOutput(batch);
- timer.done();
- }
- }
+ let input = ready!(self.input.poll_next_unpin(cx));
+ self.next_input(input, &elapsed_compute)
}
ExecutionState::ProducingOutput(batch) => {
- // slice off a part of the batch, if needed
- let output_batch = if batch.num_rows() <= self.batch_size {
- if self.input_done {
- self.exec_state = ExecutionState::Done;
- } else {
- self.exec_state = ExecutionState::ReadingInput
- }
- batch
- } else {
- // output first batch_size rows
- let num_remaining = batch.num_rows() - self.batch_size;
- let remaining = batch.slice(self.batch_size,
num_remaining);
- self.exec_state =
ExecutionState::ProducingOutput(remaining);
- batch.slice(0, self.batch_size)
- };
- return Poll::Ready(Some(Ok(
- output_batch.record_output(&self.baseline_metrics)
- )));
+ let output_batch = self.next_output(batch);
+ return Poll::Ready(Some(Ok(output_batch)));
}
ExecutionState::Done => return Poll::Ready(None),
+ };
+
+ // return error, otherwise loop again
+ if let Err(e) = res {
+ return Poll::Ready(Some(Err(e)));
+ }
+ }
+ }
+}
+
+impl GroupedHashAggregateStream {
+ /// Processes the next batch of input, updating
+ /// self.execution_state appropriately
+ fn next_input(
+ &mut self,
+ input: Option<Result<RecordBatch>>,
+ elapsed_compute: &Time,
+ ) -> Result<()> {
+ assert!(matches!(&self.exec_state, ExecutionState::ReadingInput));
+ match input {
+ // new batch to aggregate
+ Some(batch) => {
+ let batch = batch?;
+ let timer = elapsed_compute.timer();
+ // Do the grouping
+ self.group_aggregate_batch(batch)?;
+
+ // If we can begin emitting rows, do so,
+ // otherwise keep consuming input
+ assert!(!self.input_done);
+ let to_emit = self.group_ordering.emit_to();
+
+ if let Some(to_emit) = to_emit {
+ let batch = self.create_batch_from_map(to_emit)?;
+ self.exec_state = ExecutionState::ProducingOutput(batch);
+ }
+ timer.done();
+ }
+ None => {
+ // inner is done, emit all rows and switch to producing output
+ self.input_done = true;
+ self.group_ordering.input_done();
+ let timer = elapsed_compute.timer();
+ let batch = self.create_batch_from_map(EmitTo::All)?;
+ self.exec_state = ExecutionState::ProducingOutput(batch);
+ timer.done();
}
}
+ Ok(())
+ }
+
+ /// Produces the next batch of output from
ExecutionState::ProductingOuptut, updating self.execution_state
Review Comment:
```suggestion
/// Produces the next batch of output from
ExecutionState::ProducingOuptut, updating self.execution_state
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]