alamb commented on code in PR #8112:
URL: https://github.com/apache/arrow-rs/pull/8112#discussion_r2270618563
##########
arrow-select/src/coalesce.rs:
##########
@@ -241,48 +285,56 @@ impl BatchCoalescer {
return Ok(());
}
- // setup input rows
+ // set sources
assert_eq!(arrays.len(), self.in_progress_arrays.len());
self.in_progress_arrays
.iter_mut()
.zip(arrays)
- .for_each(|(in_progress, array)| {
- in_progress.set_source(Some(array));
- });
+ .for_each(|(in_progress, array)|
in_progress.set_source(Some(array)));
- // If pushing this batch would exceed the target batch size,
- // finish the current batch and start a new one
let mut offset = 0;
- while num_rows > (self.target_batch_size - self.buffered_rows) {
- let remaining_rows = self.target_batch_size - self.buffered_rows;
- debug_assert!(remaining_rows > 0);
- // Copy remaining_rows from each array
- for in_progress in self.in_progress_arrays.iter_mut() {
- in_progress.copy_rows(offset, remaining_rows)?;
+ if self.exact_size {
+ // Strict: produce exactly target-sized batches (except last)
+ while num_rows > (self.target_batch_size - self.buffered_rows) {
+ let remaining_rows = self.target_batch_size -
self.buffered_rows;
+ debug_assert!(remaining_rows > 0);
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, remaining_rows)?;
+ }
+ self.buffered_rows += remaining_rows;
+ offset += remaining_rows;
+ num_rows -= remaining_rows;
+ self.finish_buffered_batch()?;
}
- self.buffered_rows += remaining_rows;
- offset += remaining_rows;
- num_rows -= remaining_rows;
-
- self.finish_buffered_batch()?;
- }
+ if num_rows > 0 {
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, num_rows)?;
+ }
+ self.buffered_rows += num_rows;
+ }
- // Add any the remaining rows to the buffer
- self.buffered_rows += num_rows;
- if num_rows > 0 {
- for in_progress in self.in_progress_arrays.iter_mut() {
- in_progress.copy_rows(offset, num_rows)?;
+ // ensure strict invariant: only finish when exactly full
+ if self.buffered_rows >= self.target_batch_size {
+ self.finish_buffered_batch()?;
+ }
+ } else {
+ // Non-strict: append all remaining rows; if buffered >= target,
emit them
+ if num_rows > 0 {
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, num_rows)?;
+ }
+ self.buffered_rows += num_rows;
}
- }
- // If we have reached the target batch size, finalize the buffered
batch
- if self.buffered_rows >= self.target_batch_size {
- self.finish_buffered_batch()?;
+ // If we've reached or exceeded target, emit the whole buffered set
Review Comment:
If we go over the target size, I think it means the underlying storage will
reallocate (and thus copy the data)
I think the more performant way to do this is if adding `num_rows` to the
output *would* go over `target_rows`, emit early (even though some of the
allocated space is not yet used)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]