alamb commented on code in PR #13794:
URL: https://github.com/apache/datafusion/pull/13794#discussion_r1887372503
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -693,9 +696,12 @@ impl Stream for GroupedHashAggregateStream {
}
if let Some(to_emit) =
self.group_ordering.emit_to() {
- let batch = extract_ok!(self.emit(to_emit,
false));
- self.exec_state =
ExecutionState::ProducingOutput(batch);
timer.done();
+ let Some(batch) =
extract_ok!(self.emit(to_emit, false))
Review Comment:
same comment as above
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -937,7 +944,8 @@ impl GroupedHashAggregateStream {
// over the target memory size after emission, we can emit again
rather than returning Err.
let _ = self.update_memory_reservation();
let batch = RecordBatch::try_new(schema, output)?;
- Ok(batch)
+ debug_assert!(batch.num_rows() > 0);
Review Comment:
Maybe it would be good to document in comments somewhere the expectation /
behavior that no empty record batches are produced
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -654,9 +654,12 @@ impl Stream for GroupedHashAggregateStream {
}
if let Some(to_emit) =
self.group_ordering.emit_to() {
- let batch = extract_ok!(self.emit(to_emit,
false));
- self.exec_state =
ExecutionState::ProducingOutput(batch);
timer.done();
+ let Some(batch) =
extract_ok!(self.emit(to_emit, false))
+ else {
+ break 'reading_input;
+ };
+ self.exec_state =
ExecutionState::ProducingOutput(batch);
Review Comment:
Nit: I think I would find this easier to read if it it avoid a redundant
`break`:
```suggestion
if let let Some(batch) =
extract_ok!(self.emit(to_emit, false)) {
self.exec_state =
ExecutionState::ProducingOutput(batch);
}
```
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -902,14 +909,14 @@ impl GroupedHashAggregateStream {
/// Create an output RecordBatch with the group keys and
/// accumulator states/values specified in emit_to
- fn emit(&mut self, emit_to: EmitTo, spilling: bool) -> Result<RecordBatch>
{
+ fn emit(&mut self, emit_to: EmitTo, spilling: bool) ->
Result<Option<RecordBatch>> {
let schema = if spilling {
Arc::clone(&self.spill_state.spill_schema)
} else {
self.schema()
};
if self.group_values.is_empty() {
- return Ok(RecordBatch::new_empty(schema));
+ return Ok(None);
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]