alamb commented on code in PR #4371:
URL: https://github.com/apache/arrow-datafusion/pull/4371#discussion_r1032911880


##########
datafusion/core/src/physical_plan/aggregates/hash.rs:
##########
@@ -326,10 +402,20 @@ fn group_aggregate_batch(
                         )
                     })
                     .try_for_each(|(accumulator, values)| match mode {
-                        AggregateMode::Partial => 
accumulator.update_batch(&values),
+                        AggregateMode::Partial => {
+                            let size_pre = accumulator.size();

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/aggregates/hash.rs:
##########
@@ -326,10 +402,20 @@ fn group_aggregate_batch(
                         )
                     })
                     .try_for_each(|(accumulator, values)| match mode {
-                        AggregateMode::Partial => 
accumulator.update_batch(&values),
+                        AggregateMode::Partial => {
+                            let size_pre = accumulator.size();

Review Comment:
   You might also consider pulling the size accounting to before/after the 
`match` to avoid the duplication



##########
datafusion/core/src/physical_plan/aggregates/hash.rs:
##########
@@ -108,18 +125,92 @@ impl GroupedHashAggregateStream {
 
         timer.done();
 
-        Ok(Self {
-            schema,
+        let inner = GroupedHashAggregateStreamInner {
+            schema: Arc::clone(&schema),
             mode,
             input,
             aggr_expr,
             group_by,
             baseline_metrics,
             aggregate_expressions,
-            accumulators: Default::default(),
+            accumulators: Accumulators {
+                memory_consumer: MemoryConsumerProxy::new(
+                    "Accumulators",
+                    MemoryConsumerId::new(partition),
+                    Arc::clone(&context.runtime_env().memory_manager),
+                ),
+                map: RawTable::with_capacity(0),
+                group_states: Vec::with_capacity(0),
+            },
             random_state: Default::default(),
             finished: false,
-        })
+        };
+
+        let stream = futures::stream::unfold(inner, |mut this| async move {

Review Comment:
   TIL https://docs.rs/futures/0.3.25/futures/stream/fn.unfold.html



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -347,41 +411,8 @@ impl ExecutionPlan for AggregateExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let batch_size = context.session_config().batch_size();
-        let input = self.input.execute(partition, Arc::clone(&context))?;
-
-        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-
-        if self.group_by.expr.is_empty() {
-            Ok(Box::pin(AggregateStream::new(
-                self.mode,
-                self.schema.clone(),
-                self.aggr_expr.clone(),
-                input,
-                baseline_metrics,
-            )?))
-        } else if self.row_aggregate_supported() {
-            Ok(Box::pin(GroupedHashAggregateStreamV2::new(
-                self.mode,
-                self.schema.clone(),
-                self.group_by.clone(),
-                self.aggr_expr.clone(),
-                input,
-                baseline_metrics,
-                batch_size,
-                context,
-                partition,
-            )?))
-        } else {
-            Ok(Box::pin(GroupedHashAggregateStream::new(
-                self.mode,
-                self.schema.clone(),
-                self.group_by.clone(),
-                self.aggr_expr.clone(),
-                input,
-                baseline_metrics,
-            )?))
-        }
+        self.execute_typed(partition, context)

Review Comment:
   👍 



##########
datafusion/core/src/physical_plan/aggregates/hash.rs:
##########
@@ -257,12 +313,32 @@ fn group_aggregate_batch(
                         accumulator_set,
                         indices: vec![row as u32], // 1.3
                     };
+                    // NOTE: do NOT include the `GroupState` struct size in 
here because this is captured by
+                    // `group_states` (see allocation down below)
+                    allocated += group_state

Review Comment:
   Figuring out how to encapsulate some of this accounting (so it wasn't 
inlined into the code) would make it easier to maintain I think. But I don't 
think that is required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to