alamb commented on a change in pull request #8480:
URL: https://github.com/apache/arrow/pull/8480#discussion_r506512980



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -210,13 +260,60 @@ Example: average
 * Once all N record batches arrive, `merge` is performed, which builds a 
RecordBatch with N rows and 2 columns.
 * Finally, `get_value` returns an array with one entry computed from the state
 */
-struct GroupedHashAggregateIterator {
-    mode: AggregateMode,
-    schema: SchemaRef,
+async fn grouped_aggregate(
+    input: DynFutureRecordBatchIterator,
     group_expr: Vec<Arc<dyn PhysicalExpr>>,
     aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    input: SendableRecordBatchReader,
-    finished: bool,
+    schema: SchemaRef,
+    mode: AggregateMode,
+) -> ArrowResult<RecordBatch> {
+    // the expressions to evaluate the batch, one vec of expressions per 
aggregation
+    let aggregate_expressions = aggregate_expressions(&aggr_expr, &mode)
+        .map_err(|e| ExecutionError::into_arrow_external_error(e))?;
+
+    // mapping key -> (set of accumulators, indices of the key in the batch)
+    // * the indexes are updated at each row
+    // * the accumulators are updated at the end of each batch
+    // * the indexes are `clear`ed at the end of each batch
+    let accumulators: FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, 
Box<Vec<u32>>)> =
+        FnvHashMap::default();
+
+    // this will be shared by multiple threads, which will update the 
accumulator as required.
+    // todo: a mutex over all groups is _brutal_: this requires more care
+    let accumulators = Arc::new(Mutex::new(accumulators));
+    // place under an arc to avoid cloning vectors
+    let aggregate_expressions = Arc::new(aggregate_expressions);
+
+    let futures = input.map(|future_batch| {
+        // send each aggregation to its own thread
+        let accumulators = accumulators.clone();
+        let aggr_expr = aggr_expr.clone();
+        let aggregate_expressions = aggregate_expressions.clone();
+        let group_expr = group_expr.clone();
+        tokio::spawn(async move {
+            let batch = future_batch.await?;
+            let mut accumulators = accumulators.lock().unwrap();
+            group_aggregate_batch(
+                &mode,
+                &group_expr,
+                &aggr_expr,
+                &batch,
+                &mut accumulators,
+                &aggregate_expressions,
+            )
+            .map_err(ExecutionError::into_arrow_external_error)
+        })
+    });
+
+    // parallel computation of the aggregation
+    try_join_all(futures)

Review comment:
       This pattern will have the downside of potentially requiring the entire 
input to the hash_aggregate to be buffered if the aggregate consumes data more 
slowly than it can be produced




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to