This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 94dac76e79 GroupedHashAggregateStream breaks spill batch (#8004)
94dac76e79 is described below
commit 94dac76e79380aad9adc781e0c89c2db56d96301
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Nov 1 16:20:38 2023 +0000
GroupedHashAggregateStream breaks spill batch (#8004)
... into smaller chunks to decrease memory required for merging.
---
datafusion/physical-plan/src/aggregates/mod.rs | 2 +-
datafusion/physical-plan/src/aggregates/row_hash.rs | 11 ++++++++++-
2 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 4c61222317..da152a6264 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -2155,7 +2155,7 @@ mod tests {
spill: bool,
) -> Result<()> {
let task_ctx = if spill {
- new_spill_ctx(2, 2812)
+ new_spill_ctx(2, 2886)
} else {
Arc::new(TaskContext::default())
};
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index d773533ad6..7b66088584 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -673,7 +673,16 @@ impl GroupedHashAggregateStream {
let spillfile =
self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
// TODO: slice large `sorted` and write to multiple files in parallel
- writer.write(&sorted)?;
+ let mut offset = 0;
+ let total_rows = sorted.num_rows();
+
+ while offset < total_rows {
+ let length = std::cmp::min(total_rows - offset, self.batch_size);
+ let batch = sorted.slice(offset, length);
+ offset += batch.num_rows();
+ writer.write(&batch)?;
+ }
+
writer.finish()?;
self.spill_state.spills.push(spillfile);
Ok(())