milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024181640


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -138,11 +167,78 @@ impl GroupedHashAggregateStreamV2 {
             aggr_layout,
             baseline_metrics,
             aggregate_expressions,
-            aggr_state: Default::default(),
+            aggr_state,
             random_state: Default::default(),
             batch_size,
             row_group_skip_position: 0,
-        })
+        };
+
+        let stream = futures::stream::unfold(inner, |mut this| async move {
+            let elapsed_compute = this.baseline_metrics.elapsed_compute();
+
+            loop {
+                let result: ArrowResult<Option<RecordBatch>> =
+                    match this.input.next().await {
+                        Some(Ok(batch)) => {
+                            let timer = elapsed_compute.timer();
+                            let result = group_aggregate_batch(
+                                &this.mode,
+                                &this.random_state,
+                                &this.group_by,
+                                &mut this.accumulators,
+                                &this.group_schema,
+                                this.aggr_layout.clone(),
+                                batch,
+                                &mut this.aggr_state,
+                                &this.aggregate_expressions,
+                            )
+                            .await;
+
+                            timer.done();
+
+                            match result {
+                                Ok(_) => continue,

Review Comment:
   Apologies you're right @crepererum it is per batch.
   
   The reason why I believe moving it out makes sense is separation of 
concerns, but it's up to you.
   
   for example, at line 363
   
   ```rust
           // allocate memory
           // This happens AFTER we actually used the memory, but simplifies 
the whole accounting and we are OK with
           // overshooting a bit. Also this means we either store the whole 
record batch or not.
           memory_consumer.alloc(allocated).await?;
   ```
   can this trigger spill? will the state be consistent if spill is triggered. 
My guess it will be not, it might be implementation specific, but hard to tell 
without understanding memory management implementation, and store 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to