milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024085041
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -138,11 +167,78 @@ impl GroupedHashAggregateStreamV2 {
aggr_layout,
baseline_metrics,
aggregate_expressions,
- aggr_state: Default::default(),
+ aggr_state,
random_state: Default::default(),
batch_size,
row_group_skip_position: 0,
- })
+ };
+
+ let stream = futures::stream::unfold(inner, |mut this| async move {
+ let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+ loop {
+ let result: ArrowResult<Option<RecordBatch>> =
+ match this.input.next().await {
+ Some(Ok(batch)) => {
+ let timer = elapsed_compute.timer();
+ let result = group_aggregate_batch(
+ &this.mode,
+ &this.random_state,
+ &this.group_by,
+ &mut this.accumulators,
+ &this.group_schema,
+ this.aggr_layout.clone(),
+ batch,
+ &mut this.aggr_state,
+ &this.aggregate_expressions,
+ )
+ .await;
+
+ timer.done();
+
+ match result {
+ Ok(_) => continue,
Review Comment:
IMHO, this would be place to do something like:
```rust
Ok(_) => {
let new_data_size = this.aggr_state.get_current_size();
let acquired = this.memory_manager.can_grow_directly(new_data_size -
data_size_before_batch, data_size_before_batch);
if !acquired {
this.aggr_state.spill();
this.memory_manager.record_free_then_acquire(data_size, 0);
}
continue;
}
```
we basically assume that `group_aggregate_batch` can get all the memory it
needs, no need to do per row interaction with memory manager.
this would decouple process and accounting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]