crepererum commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024099248
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -138,11 +167,78 @@ impl GroupedHashAggregateStreamV2 {
aggr_layout,
baseline_metrics,
aggregate_expressions,
- aggr_state: Default::default(),
+ aggr_state,
random_state: Default::default(),
batch_size,
row_group_skip_position: 0,
- })
+ };
+
+ let stream = futures::stream::unfold(inner, |mut this| async move {
+ let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+ loop {
+ let result: ArrowResult<Option<RecordBatch>> =
+ match this.input.next().await {
+ Some(Ok(batch)) => {
+ let timer = elapsed_compute.timer();
+ let result = group_aggregate_batch(
+ &this.mode,
+ &this.random_state,
+ &this.group_by,
+ &mut this.accumulators,
+ &this.group_schema,
+ this.aggr_layout.clone(),
+ batch,
+ &mut this.aggr_state,
+ &this.aggregate_expressions,
+ )
+ .await;
+
+ timer.done();
+
+ match result {
+ Ok(_) => continue,
Review Comment:
The interaction is not per row. It's per batch. I can place the accounting
here. The code you propose is basically the same that currently runs, just
inlined (it's the default impl. of `MemoryConsumer::try_grow`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]