alamb commented on code in PR #8112:
URL: https://github.com/apache/arrow-rs/pull/8112#discussion_r2270618563


##########
arrow-select/src/coalesce.rs:
##########
@@ -241,48 +285,56 @@ impl BatchCoalescer {
             return Ok(());
         }
 
-        // setup input rows
+        // set sources
         assert_eq!(arrays.len(), self.in_progress_arrays.len());
         self.in_progress_arrays
             .iter_mut()
             .zip(arrays)
-            .for_each(|(in_progress, array)| {
-                in_progress.set_source(Some(array));
-            });
+            .for_each(|(in_progress, array)| 
in_progress.set_source(Some(array)));
 
-        // If pushing this batch would exceed the target batch size,
-        // finish the current batch and start a new one
         let mut offset = 0;
-        while num_rows > (self.target_batch_size - self.buffered_rows) {
-            let remaining_rows = self.target_batch_size - self.buffered_rows;
-            debug_assert!(remaining_rows > 0);
 
-            // Copy remaining_rows from each array
-            for in_progress in self.in_progress_arrays.iter_mut() {
-                in_progress.copy_rows(offset, remaining_rows)?;
+        if self.exact_size {
+            // Strict: produce exactly target-sized batches (except last)
+            while num_rows > (self.target_batch_size - self.buffered_rows) {
+                let remaining_rows = self.target_batch_size - 
self.buffered_rows;
+                debug_assert!(remaining_rows > 0);
+                for in_progress in self.in_progress_arrays.iter_mut() {
+                    in_progress.copy_rows(offset, remaining_rows)?;
+                }
+                self.buffered_rows += remaining_rows;
+                offset += remaining_rows;
+                num_rows -= remaining_rows;
+                self.finish_buffered_batch()?;
             }
 
-            self.buffered_rows += remaining_rows;
-            offset += remaining_rows;
-            num_rows -= remaining_rows;
-
-            self.finish_buffered_batch()?;
-        }
+            if num_rows > 0 {
+                for in_progress in self.in_progress_arrays.iter_mut() {
+                    in_progress.copy_rows(offset, num_rows)?;
+                }
+                self.buffered_rows += num_rows;
+            }
 
-        // Add any the remaining rows to the buffer
-        self.buffered_rows += num_rows;
-        if num_rows > 0 {
-            for in_progress in self.in_progress_arrays.iter_mut() {
-                in_progress.copy_rows(offset, num_rows)?;
+            // ensure strict invariant: only finish when exactly full
+            if self.buffered_rows >= self.target_batch_size {
+                self.finish_buffered_batch()?;
+            }
+        } else {
+            // Non-strict: append all remaining rows; if buffered >= target, 
emit them
+            if num_rows > 0 {
+                for in_progress in self.in_progress_arrays.iter_mut() {
+                    in_progress.copy_rows(offset, num_rows)?;
+                }
+                self.buffered_rows += num_rows;
             }
-        }
 
-        // If we have reached the target batch size, finalize the buffered 
batch
-        if self.buffered_rows >= self.target_batch_size {
-            self.finish_buffered_batch()?;
+            // If we've reached or exceeded target, emit the whole buffered set

Review Comment:
   If we go over the target size, I think it means the underlying storage will 
reallocate (and thus copy the data)
   
   I think the more performant way to do this is if adding `num_rows` to the 
output *would* go over `target_rows`, emit early (even though some of the 
allocated space is not yet used)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to