milenkovicm commented on code in PR #8004:
URL: https://github.com/apache/arrow-datafusion/pull/8004#discussion_r1378641007
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -673,7 +673,16 @@ impl GroupedHashAggregateStream {
let spillfile =
self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
// TODO: slice large `sorted` and write to multiple files in parallel
- writer.write(&sorted)?;
+ let mut offset = 0;
+ let total_rows = sorted.num_rows();
+
+ while offset < total_rows {
+ let length = std::cmp::min(total_rows - offset, self.batch_size);
+ let batch = sorted.slice(offset, length);
+ offset += batch.num_rows();
+ writer.write(&batch)?;
+ }
+
Review Comment:
Sorry for the confusion, not smaller files, smaller batches. We still keep
one file per spill method invocation
Previous implementation would just have one batch per file, this change
would introduce more than one batch per file. So file size should be similar
size to previous implantation, probably even a bit bigger, but that same file
will have more than one batch in it.
Streaming merge will open same number of files like before, but it will load
smaller batches into memory (if we have one batch per file it would mean whole
file would be loaded).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]