kazuyukitanimura commented on code in PR #7400:
URL: https://github.com/apache/arrow-datafusion/pull/7400#discussion_r1326524013


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -466,15 +625,122 @@ impl GroupedHashAggregateStream {
         for acc in self.accumulators.iter_mut() {
             match self.mode {
                 AggregateMode::Partial => output.extend(acc.state(emit_to)?),
+                _ if spilling => {
+                    // If spilling, output partial state because the spilled 
data will be
+                    // merged and re-evaluated later.
+                    output.extend(acc.state(emit_to)?)
+                }
                 AggregateMode::Final
                 | AggregateMode::FinalPartitioned
                 | AggregateMode::Single
                 | AggregateMode::SinglePartitioned => 
output.push(acc.evaluate(emit_to)?),
             }
         }
 
-        self.update_memory_reservation()?;
-        let batch = RecordBatch::try_new(self.schema(), output)?;
+        // emit reduces the memory usage. Ignore Err from 
update_memory_reservation. Even if it is
+        // over the target memory size after emission, we can emit again 
rather than returning Err.
+        let _ = self.update_memory_reservation();
+        let batch = RecordBatch::try_new(schema, output)?;
         Ok(batch)
     }
+
+    /// Optimistically, [`Self::group_aggregate_batch`] allows to exceed the 
memory target slightly
+    /// (~ 1 [`RecordBatch`]) for simplicity. In such cases, spill the data to 
disk and clear the
+    /// memory. Currently only [`GroupOrdering::None`] is supported for 
spilling.
+    fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) -> 
Result<()> {
+        // TODO: support group_ordering for spilling
+        if self.group_values.len() > 0
+            && batch.num_rows() > 0
+            && matches!(self.group_ordering, GroupOrdering::None)
+            && !matches!(self.mode, AggregateMode::Partial)
+            && !self.spill_state.is_stream_merging
+            && self.update_memory_reservation().is_err()
+        {
+            // Use input batch (Partial mode) schema for spilling because
+            // the spilled data will be merged and re-evaluated later.
+            self.spill_state.spill_schema = batch.schema();
+            self.spill()?;
+            self.clear_shrink(batch);
+        }
+        Ok(())
+    }
+
+    /// Emit all rows, sort them, and store them on disk.
+    fn spill(&mut self) -> Result<()> {
+        let emit = self.emit(EmitTo::All, true)?;
+        let sorted = sort_batch(&emit, &self.spill_state.spill_expr, None)?;

Review Comment:
   Using the example of final aggregation in the doc
   
https://github.com/apache/arrow-datafusion/pull/7400/files#diff-0b72cdc7a5e02b6650430c3eb7bdea4f1d2d32867b927633d92abb4fcf31c81bR187-R202
   We have to re-group for rows with `a = 2`. Please notice `a = 2` can be in 
any spilled batches.
   
   The spills happened because the memory is insufficient for all rows. The 
only choice we have left is merge based aggregation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to