This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 94dac76e79 GroupedHashAggregateStream breaks spill batch (#8004)
94dac76e79 is described below

commit 94dac76e79380aad9adc781e0c89c2db56d96301
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Nov 1 16:20:38 2023 +0000

    GroupedHashAggregateStream breaks spill batch (#8004)
    
    ... into smaller chunks to decrease memory required for merging.
---
 datafusion/physical-plan/src/aggregates/mod.rs      |  2 +-
 datafusion/physical-plan/src/aggregates/row_hash.rs | 11 ++++++++++-
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 4c61222317..da152a6264 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -2155,7 +2155,7 @@ mod tests {
         spill: bool,
     ) -> Result<()> {
         let task_ctx = if spill {
-            new_spill_ctx(2, 2812)
+            new_spill_ctx(2, 2886)
         } else {
             Arc::new(TaskContext::default())
         };
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index d773533ad6..7b66088584 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -673,7 +673,16 @@ impl GroupedHashAggregateStream {
         let spillfile = 
self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
         let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
         // TODO: slice large `sorted` and write to multiple files in parallel
-        writer.write(&sorted)?;
+        let mut offset = 0;
+        let total_rows = sorted.num_rows();
+
+        while offset < total_rows {
+            let length = std::cmp::min(total_rows - offset, self.batch_size);
+            let batch = sorted.slice(offset, length);
+            offset += batch.num_rows();
+            writer.write(&batch)?;
+        }
+
         writer.finish()?;
         self.spill_state.spills.push(spillfile);
         Ok(())

Reply via email to