milenkovicm commented on code in PR #8004:
URL: https://github.com/apache/arrow-datafusion/pull/8004#discussion_r1378641007


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -673,7 +673,16 @@ impl GroupedHashAggregateStream {
         let spillfile = 
self.runtime.disk_manager.create_tmp_file("HashAggSpill")?;
         let mut writer = IPCWriter::new(spillfile.path(), &emit.schema())?;
         // TODO: slice large `sorted` and write to multiple files in parallel
-        writer.write(&sorted)?;
+        let mut offset = 0;
+        let total_rows = sorted.num_rows();
+
+        while offset < total_rows {
+            let length = std::cmp::min(total_rows - offset, self.batch_size);
+            let batch = sorted.slice(offset, length);
+            offset += batch.num_rows();
+            writer.write(&batch)?;
+        }
+

Review Comment:
   Sorry for the confusion, not smaller files, smaller batches. We still keep 
one file per spill method invocation
   
   Previous implementation would just have one batch per file, this change 
would introduce more than one batch per file. So file size should be similar 
size to previous implantation, probably even a bit bigger, but that same file 
will have more than one batch in it.
    
   Streaming merge will open same number of files like before, but it will load 
smaller batches into memory (if we have one batch per file it would mean whole 
file would be loaded). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to