ozankabak commented on code in PR #4972:
URL: https://github.com/apache/arrow-datafusion/pull/4972#discussion_r1081247158


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -219,91 +221,76 @@ impl GroupedHashAggregateStream {
             batch_size,
             row_group_skip_position: 0,
             indices: [normal_agg_indices, row_agg_indices],
-        };
+        })
+    }
+}
 
-        let stream = futures::stream::unfold(inner, |mut this| async move {
-            let elapsed_compute = this.baseline_metrics.elapsed_compute();
+impl Stream for GroupedHashAggregateStream {
+    type Item = ArrowResult<RecordBatch>;
 
-            loop {
-                let result: ArrowResult<Option<RecordBatch>> =
-                    match this.input.next().await {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
+
+        loop {
+            match self.exec_state {
+                ExecutionState::ReadingInput => {
+                    match ready!(self.input.poll_next_unpin(cx)) {
+                        // new batch to aggregate
                         Some(Ok(batch)) => {
                             let timer = elapsed_compute.timer();
-                            let result = group_aggregate_batch(
-                                &this.mode,
-                                &this.random_state,
-                                &this.group_by,
-                                &this.normal_aggr_expr,
-                                &mut this.row_accumulators,
-                                &mut this.row_converter,
-                                this.row_aggr_layout.clone(),
-                                batch,
-                                &mut this.row_aggr_state,
-                                &this.normal_aggregate_expressions,
-                                &this.row_aggregate_expressions,
-                            );
-
+                            let result = self.group_aggregate_batch(batch);
                             timer.done();
 
                             // allocate memory
                             // This happens AFTER we actually used the memory, 
but simplifies the whole accounting and we are OK with
                             // overshooting a bit. Also this means we either 
store the whole record batch or not.
                             match result.and_then(|allocated| {
-                                
this.row_aggr_state.reservation.try_grow(allocated)
+                                
self.row_aggr_state.reservation.try_grow(allocated)
                             }) {
-                                Ok(_) => continue,
-                                Err(e) => 
Err(ArrowError::ExternalError(Box::new(e))),
+                                Ok(_) => {}
+                                Err(e) => {
+                                    return Poll::Ready(Some(Err(
+                                        ArrowError::ExternalError(Box::new(e)),
+                                    )))
+                                }
                             }
                         }

Review Comment:
   Since the `Ok` case is a no-op, an `if let Err(e) = ...` seems to be more 
idiomatic here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to