jorgecarleitao commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r508230895



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -331,72 +337,74 @@ impl GroupedHashAggregateIterator {
 }
 
 type AccumulatorSet = Vec<Box<dyn Accumulator>>;
+type Accumulators = FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, 
Box<Vec<u32>>)>;
 
-impl Iterator for GroupedHashAggregateIterator {
+impl Stream for GroupedHashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mode = &self.mode;
-        let group_expr = &self.group_expr;
-        let aggr_expr = &self.aggr_expr;
+        let mode = self.mode.clone();
+        let group_expr = self.group_expr.clone();
+        let aggr_expr = self.aggr_expr.clone();
+        let schema = self.schema.clone();
 
         // the expressions to evaluate the batch, one vec of expressions per 
aggregation
         let aggregate_expressions = match aggregate_expressions(&aggr_expr, 
&mode) {
             Ok(e) => e,
-            Err(e) => return 
Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return 
Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         // mapping key -> (set of accumulators, indices of the key in the 
batch)
         // * the indexes are updated at each row
         // * the accumulators are updated at the end of each batch
         // * the indexes are `clear`ed at the end of each batch
-        let mut accumulators: FnvHashMap<
-            Vec<GroupByScalar>,
-            (AccumulatorSet, Box<Vec<u32>>),
-        > = FnvHashMap::default();
+        //let mut accumulators: Accumulators = FnvHashMap::default();
 
         // iterate over all input batches and update the accumulators
-        match self
-            .input
-            .as_mut()
-            .into_iter()
-            .map(|batch| {
+        let future = self.input.as_mut().try_fold(
+            Accumulators::default(),
+            |accumulators, batch| async {
                 group_aggregate_batch(
                     &mode,
                     &group_expr,
                     &aggr_expr,
-                    &batch?,
-                    &mut accumulators,
+                    batch,
+                    accumulators,
                     &aggregate_expressions,
                 )
                 .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            },
+        );
 
-        Some(
-            create_batch_from_map(
-                &self.mode,
-                &accumulators,
-                self.group_expr.len(),
-                &self.schema,
-            )
-            .map_err(ExecutionError::into_arrow_external_error),
-        )
+        let future = future.map(|accumulators| match accumulators {
+            Ok(accumulators) => {
+                create_batch_from_map(&mode, &accumulators, group_expr.len(), 
&schema)
+            }
+            Err(e) => Err(e),
+        });

Review comment:
       Good point. Fixed (and below).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to