Jefffrey commented on code in PR #9818:
URL: https://github.com/apache/arrow-datafusion/pull/9818#discussion_r1554517017


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -475,25 +477,24 @@ impl Stream for GroupedHashAggregateStream {
                     }
                 }
 
-                ExecutionState::ProducingOutput(batch) => {
-                    // slice off a part of the batch, if needed
+                ExecutionState::ProducingOutput(batches) => {
+                    // batches is not empty
+                    let len = batches.len();
+                    assert!(len > 0);
                     let output_batch;
-                    let size = self.batch_size;
-                    (self.exec_state, output_batch) = if batch.num_rows() <= 
size {
+                    (self.exec_state, output_batch) = if len == 1 {
                         (
                             if self.input_done {
                                 ExecutionState::Done
                             } else {
                                 ExecutionState::ReadingInput
                             },
-                            batch.clone(),
+                            batches[0].clone(),
                         )
                     } else {
-                        // output first batch_size rows
-                        let size = self.batch_size;
-                        let num_remaining = batch.num_rows() - size;
-                        let remaining = batch.slice(size, num_remaining);
-                        let output = batch.slice(0, size);
+                        // output first batches element
+                        let remaining = batches[1..].to_vec();

Review Comment:
   If we push the batches in reverse order (treat `batches` as a stack) and pop 
to get each batch, I think could avoid calling `to_vec()` multiple times (see 
next comment)



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -60,7 +60,7 @@ pub(crate) enum ExecutionState {
     ReadingInput,
     /// When producing output, the remaining rows to output are stored
     /// here and are sliced off as needed in batch_size chunks

Review Comment:
   Might wanna update this comment



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -796,4 +799,29 @@ impl GroupedHashAggregateStream {
         timer.done();
         Ok(())
     }
+
+    fn split_batch(&self, batch: RecordBatch) -> Result<Vec<RecordBatch>> {
+        let mut batches = vec![];
+        let mut length = self.batch_size;
+        let len = batch.num_rows();
+        if len == 0 {
+            return Ok(vec![batch]);
+        }
+        for offset in (0..len).step_by(length) {
+            if offset + length > len {
+                length = len - offset;
+            }
+            let slice_columns = batch
+                .columns()
+                .iter()
+                .map(|array| {
+                    let sliced_array = array.slice(offset, length);
+                    sliced_array.to_owned()
+                })
+                .collect();
+            batches.push(RecordBatch::try_new(batch.schema().clone(), 
slice_columns)?);
+        }

Review Comment:
   I feel this can be simplified by first calculating how many slices you'll 
need (`batch.num_rows() / self.batch_size` or something like that) which would 
allow you to preallocate the `Vec` in advance, and then you could push the 
`RecordBatches` in reverse order which allows popping in constant time 
(previous comment)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to