jayzhan211 commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1714933499


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -798,42 +838,64 @@ impl GroupedHashAggregateStream {
 
     /// Create an output RecordBatch with the group keys and
     /// accumulator states/values specified in emit_to
-    fn emit(&mut self, emit_to: EmitTo, spilling: bool) -> Result<RecordBatch> 
{
+    fn emit(
+        &mut self,
+        group_emit: EmitTo,
+        state_emit: EmitTo,
+        spilling: bool,
+    ) -> Result<VecDeque<RecordBatch>> {
+        let merge_mode = MergeMode::new(group_emit, state_emit)?;
+
         let schema = if spilling {
             Arc::clone(&self.spill_state.spill_schema)
         } else {
             self.schema()
         };
+
         if self.group_values.is_empty() {
-            return Ok(RecordBatch::new_empty(schema));
+            return Ok(VecDeque::from([RecordBatch::new_empty(schema)]));
         }
 
-        let mut output = self.group_values.emit(emit_to)?;
-        if let EmitTo::First(n) = emit_to {
+        let mut outputs = self.group_values.emit(group_emit)?;
+        if let EmitTo::First(n) = group_emit {
             self.group_ordering.remove_groups(n);
         }
 
         // Next output each aggregate value
         for acc in self.accumulators.iter_mut() {
             match self.mode {
-                AggregateMode::Partial => output.extend(acc.state(emit_to)?),
+                AggregateMode::Partial => {
+                    let states = acc.state(state_emit)?;
+                    merge_mode.merge_groups_and_partial_states(&mut outputs, 
states);
+                }
                 _ if spilling => {
                     // If spilling, output partial state because the spilled 
data will be
                     // merged and re-evaluated later.
-                    output.extend(acc.state(emit_to)?)
+                    let states = acc.state(state_emit)?;
+                    merge_mode.merge_groups_and_partial_states(&mut outputs, 
states);
                 }
                 AggregateMode::Final
                 | AggregateMode::FinalPartitioned
                 | AggregateMode::Single
-                | AggregateMode::SinglePartitioned => 
output.push(acc.evaluate(emit_to)?),
+                | AggregateMode::SinglePartitioned => {
+                    let state = acc.evaluate(state_emit)?;
+                    merge_mode.merge_groups_and_final_states(&mut outputs, 
state);
+                }
             }
         }
 
         // emit reduces the memory usage. Ignore Err from 
update_memory_reservation. Even if it is
         // over the target memory size after emission, we can emit again 
rather than returning Err.
         let _ = self.update_memory_reservation();
-        let batch = RecordBatch::try_new(schema, output)?;
-        Ok(batch)
+        let batches = outputs

Review Comment:
   ```suggestion
         outputs
               .into_iter()
               .map(|o| {
                   RecordBatch::try_new(Arc::clone(&schema), o)
                       .map_err(|e| DataFusionError::ArrowError(e, None))
               })
               .collect::<Result<VecDeque<_>>>()
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -798,42 +838,64 @@ impl GroupedHashAggregateStream {
 
     /// Create an output RecordBatch with the group keys and
     /// accumulator states/values specified in emit_to
-    fn emit(&mut self, emit_to: EmitTo, spilling: bool) -> Result<RecordBatch> 
{
+    fn emit(
+        &mut self,
+        group_emit: EmitTo,
+        state_emit: EmitTo,
+        spilling: bool,
+    ) -> Result<VecDeque<RecordBatch>> {
+        let merge_mode = MergeMode::new(group_emit, state_emit)?;
+
         let schema = if spilling {
             Arc::clone(&self.spill_state.spill_schema)
         } else {
             self.schema()
         };
+
         if self.group_values.is_empty() {
-            return Ok(RecordBatch::new_empty(schema));
+            return Ok(VecDeque::from([RecordBatch::new_empty(schema)]));
         }
 
-        let mut output = self.group_values.emit(emit_to)?;
-        if let EmitTo::First(n) = emit_to {
+        let mut outputs = self.group_values.emit(group_emit)?;
+        if let EmitTo::First(n) = group_emit {
             self.group_ordering.remove_groups(n);
         }
 
         // Next output each aggregate value
         for acc in self.accumulators.iter_mut() {
             match self.mode {
-                AggregateMode::Partial => output.extend(acc.state(emit_to)?),
+                AggregateMode::Partial => {
+                    let states = acc.state(state_emit)?;
+                    merge_mode.merge_groups_and_partial_states(&mut outputs, 
states);
+                }
                 _ if spilling => {
                     // If spilling, output partial state because the spilled 
data will be
                     // merged and re-evaluated later.
-                    output.extend(acc.state(emit_to)?)
+                    let states = acc.state(state_emit)?;
+                    merge_mode.merge_groups_and_partial_states(&mut outputs, 
states);
                 }
                 AggregateMode::Final
                 | AggregateMode::FinalPartitioned
                 | AggregateMode::Single
-                | AggregateMode::SinglePartitioned => 
output.push(acc.evaluate(emit_to)?),
+                | AggregateMode::SinglePartitioned => {
+                    let state = acc.evaluate(state_emit)?;
+                    merge_mode.merge_groups_and_final_states(&mut outputs, 
state);
+                }
             }
         }
 
         // emit reduces the memory usage. Ignore Err from 
update_memory_reservation. Even if it is
         // over the target memory size after emission, we can emit again 
rather than returning Err.
         let _ = self.update_memory_reservation();
-        let batch = RecordBatch::try_new(schema, output)?;
-        Ok(batch)
+        let batches = outputs
+            .into_iter()
+            .map(|o| {
+                RecordBatch::try_new(Arc::clone(&schema), o)
+                    .map_err(|e| DataFusionError::ArrowError(e, None))

Review Comment:
   use macro for error



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream {
 
     /// scratch space for the current input [`RecordBatch`] being
     /// processed. Reused across batches here to avoid reallocations
-    current_group_indices: Vec<usize>,
+    current_group_indices: Vec<u64>,

Review Comment:
   what is the reason to use u64 instead of usize



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to