zhuqi-lucas commented on code in PR #8112:
URL: https://github.com/apache/arrow-rs/pull/8112#discussion_r2272476487
##########
arrow-select/src/coalesce.rs:
##########
@@ -241,48 +285,56 @@ impl BatchCoalescer {
return Ok(());
}
- // setup input rows
+ // set sources
assert_eq!(arrays.len(), self.in_progress_arrays.len());
self.in_progress_arrays
.iter_mut()
.zip(arrays)
- .for_each(|(in_progress, array)| {
- in_progress.set_source(Some(array));
- });
+ .for_each(|(in_progress, array)|
in_progress.set_source(Some(array)));
- // If pushing this batch would exceed the target batch size,
- // finish the current batch and start a new one
let mut offset = 0;
- while num_rows > (self.target_batch_size - self.buffered_rows) {
- let remaining_rows = self.target_batch_size - self.buffered_rows;
- debug_assert!(remaining_rows > 0);
- // Copy remaining_rows from each array
- for in_progress in self.in_progress_arrays.iter_mut() {
- in_progress.copy_rows(offset, remaining_rows)?;
+ if self.exact_size {
+ // Strict: produce exactly target-sized batches (except last)
+ while num_rows > (self.target_batch_size - self.buffered_rows) {
+ let remaining_rows = self.target_batch_size -
self.buffered_rows;
+ debug_assert!(remaining_rows > 0);
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, remaining_rows)?;
+ }
+ self.buffered_rows += remaining_rows;
+ offset += remaining_rows;
+ num_rows -= remaining_rows;
+ self.finish_buffered_batch()?;
}
- self.buffered_rows += remaining_rows;
- offset += remaining_rows;
- num_rows -= remaining_rows;
-
- self.finish_buffered_batch()?;
- }
+ if num_rows > 0 {
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, num_rows)?;
+ }
+ self.buffered_rows += num_rows;
+ }
- // Add any the remaining rows to the buffer
- self.buffered_rows += num_rows;
- if num_rows > 0 {
- for in_progress in self.in_progress_arrays.iter_mut() {
- in_progress.copy_rows(offset, num_rows)?;
+ // ensure strict invariant: only finish when exactly full
+ if self.buffered_rows >= self.target_batch_size {
+ self.finish_buffered_batch()?;
+ }
+ } else {
+ // Non-strict: append all remaining rows; if buffered >= target,
emit them
+ if num_rows > 0 {
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, num_rows)?;
+ }
+ self.buffered_rows += num_rows;
}
- }
- // If we have reached the target batch size, finalize the buffered
batch
- if self.buffered_rows >= self.target_batch_size {
- self.finish_buffered_batch()?;
+ // If we've reached or exceeded target, emit the whole buffered set
Review Comment:
Hi @alamb , thank you for review and good suggestion!
I am trying with this patch to do this, but the performance not getting
better, the best performance from benchmark is still the exact size batch emit.
🤔
```rust
diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index be2bbcafb6..93bdf86a2d 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -100,16 +100,23 @@ use primitive::InProgressPrimitiveArray;
/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
///
-/// // Non-strict: produce batch once buffered >= target, batch may be
larger than target
+/// // Non-strict: optimized for memory efficiency, may emit early to avoid
reallocation
/// let mut coalescer = BatchCoalescer::new(batch1.schema(),
4).with_exact_size(false);
/// coalescer.push_batch(batch1).unwrap();
/// // still < 4 rows buffered
/// assert!(coalescer.next_completed_batch().is_none());
/// coalescer.push_batch(batch2).unwrap();
-/// // now buffered >= 4, non-strict mode emits whole buffered set (5 rows)
+/// // buffered=3, new=2, total would be 5 > 4, so emit buffered 3 rows
first
/// let finished = coalescer.next_completed_batch().unwrap();
-/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4, 5])).unwrap();
+/// let expected = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
/// assert_eq!(finished, expected);
+///
+/// // The remaining 2 rows from batch2 are now buffered
+/// assert!(coalescer.next_completed_batch().is_none());
+/// coalescer.finish_buffered_batch().unwrap();
+/// let remaining = coalescer.next_completed_batch().unwrap();
+/// let expected = record_batch!(("a", Int32, [4, 5])).unwrap();
+/// assert_eq!(remaining, expected);
/// ```
///
/// # Background
@@ -145,16 +152,21 @@ use primitive::InProgressPrimitiveArray;
///
/// 1. Output rows are produced in the same order as the input rows
///
-/// 2. The output is a sequence of batches, with all but the last being at
exactly
-/// `target_batch_size` rows.
+/// 2. The output batch sizes depend on the `exact_size` setting:
+/// - In strict mode: all but the last batch have exactly
`target_batch_size` rows
+/// - In non-strict mode: batch sizes are optimized to avoid memory
reallocation
///
/// Notes on `exact_size`:
///
/// - `exact_size == true` (strict): output batches are produced so that
all but
/// the final batch have exactly `target_batch_size` rows (default
behavior).
-/// - `exact_size == false` (non-strict, default for this crate): output
batches
-/// will be produced when the buffered rows are >= `target_batch_size`.
The
-/// produced batch may be larger than `target_batch_size` (i.e., size >=
target).
+/// - `exact_size == false` (non-strict): output batches are optimized for
memory
+/// efficiency. Batches are emitted early to avoid buffer reallocation
when adding
+/// new data would exceed the target size. Large input batches are split
into
+/// target-sized chunks to prevent excessive memory allocation. This may
result in
+/// output batches that are smaller than `target_batch_size`, but the
algorithm
+/// ensures batches are as close to the target size as possible while
maintaining
+/// memory efficiency. Small batches only occur to avoid costly memory
operations.
#[derive(Debug)]
pub struct BatchCoalescer {
/// The input schema
@@ -320,7 +332,29 @@ impl BatchCoalescer {
self.finish_buffered_batch()?;
}
} else {
- // Non-strict: append all remaining rows; if buffered >=
target, emit them
+ // Non-strict: emit early if adding num_rows would exceed
target to avoid reallocation
+ if self.buffered_rows > 0 && self.buffered_rows + num_rows >
self.target_batch_size {
+ // Emit the current buffered data before processing the new
batch
+ // This avoids potential reallocation in the underlying
storage
+ self.finish_buffered_batch()?;
+ }
+
+ // If num_rows is larger than target_batch_size, split it into
target-sized chunks
+ // to avoid allocating overly large buffers
+ while num_rows > self.target_batch_size {
+ let chunk_size = self.target_batch_size;
+ for in_progress in self.in_progress_arrays.iter_mut() {
+ in_progress.copy_rows(offset, chunk_size)?;
+ }
+ self.buffered_rows += chunk_size;
+ offset += chunk_size;
+ num_rows -= chunk_size;
+
+ // Emit this full chunk immediately
+ self.finish_buffered_batch()?;
+ }
+
+ // Now append remaining rows (guaranteed to be <=
target_batch_size) to buffer
if num_rows > 0 {
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows(offset, num_rows)?;
@@ -328,7 +362,7 @@ impl BatchCoalescer {
self.buffered_rows += num_rows;
}
- // If we've reached or exceeded target, emit the whole buffered
set
+ // If the current buffer has reached or exceeded target, emit it
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}
@@ -1381,38 +1415,62 @@ mod tests {
coalescer.push_batch(batch1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
- // push second batch (2 rows) -> buffered becomes 5 >= 4,
non-strict emits all 5 rows
+ // push second batch (2 rows) -> buffered=3, new=2, 3+2=5 > 4
+ // NEW BEHAVIOR: emit buffered 3 rows first to avoid reallocation
coalescer.push_batch(batch2).unwrap();
- let out = coalescer
+ let out1 = coalescer
.next_completed_batch()
- .expect("expected a completed batch");
- assert_eq!(out.num_rows(), 5);
-
- // check contents equal to concatenation of 0..5
- let expected = uint32_batch(0..5);
- let actual = normalize_batch(out);
- let expected = normalize_batch(expected);
- assert_eq!(expected, actual);
+ .expect("expected first batch");
+ assert_eq!(out1.num_rows(), 3); // Only the first batch (early emit)
+
+ // The second batch should be buffered now
+ assert!(coalescer.next_completed_batch().is_none());
+
+ // Finish to get the remaining buffered data
+ coalescer.finish_buffered_batch().unwrap();
+ let out2 = coalescer
+ .next_completed_batch()
+ .expect("expected second batch");
+ assert_eq!(out2.num_rows(), 2); // The second batch
+
+ // check contents
+ let expected1 = uint32_batch(0..3);
+ let expected2 = uint32_batch(3..5);
+ assert_eq!(normalize_batch(out1), normalize_batch(expected1));
+ assert_eq!(normalize_batch(out2), normalize_batch(expected2));
}
#[test]
fn test_non_strict_single_large_batch() {
- // one large batch > target: in non-strict mode whole batch should
be emitted
+ // one large batch > target: should be split into target-sized
chunks
let batch = uint32_batch(0..4096);
let schema = Arc::clone(&batch.schema());
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema),
1000).with_exact_size(false);
coalescer.push_batch(batch).unwrap();
- let out = coalescer
- .next_completed_batch()
- .expect("expected a completed batch");
- assert_eq!(out.num_rows(), 4096);
-
- // compare to expected
- let expected = uint32_batch(0..4096);
- let actual = normalize_batch(out);
- let expected = normalize_batch(expected);
- assert_eq!(expected, actual);
+
+ // NEW BEHAVIOR: large batch should be split into chunks of
target_batch_size
+ // 4096 / 1000 = 4 full batches + 96 remainder
+ let mut outputs = vec![];
+ while let Some(b) = coalescer.next_completed_batch() {
+ outputs.push(b);
+ }
+
+ assert_eq!(outputs.len(), 4); // 4 full batches emitted immediately
+
+ // Each should be exactly 1000 rows
+ for (i, out) in outputs.iter().enumerate() {
+ assert_eq!(out.num_rows(), 1000);
+ let expected = uint32_batch((i * 1000) as u32..((i + 1) * 1000)
as u32);
+ assert_eq!(normalize_batch(out.clone()),
normalize_batch(expected));
+ }
+
+ // Remaining 96 rows should be buffered
+ coalescer.finish_buffered_batch().unwrap();
+ let final_batch = coalescer.next_completed_batch().expect("expected
final batch");
+ assert_eq!(final_batch.num_rows(), 96);
+ let expected_final = uint32_batch(4000..4096);
+ assert_eq!(normalize_batch(final_batch),
normalize_batch(expected_final));
}
#[test]
@@ -1439,71 +1497,104 @@ mod tests {
#[test]
fn test_non_strict_multiple_emits_over_time() {
- // multiple pushes that each eventually push buffered >= target and
emit
+ // multiple pushes with early emit behavior
let b1 = uint32_batch(0..3); // 3
- let b2 = uint32_batch(3..5); // 2 -> 3+2=5 emit (first)
- let b3 = uint32_batch(5..8); // 3
- let b4 = uint32_batch(8..10); // 2 -> 3+2=5 emit (second)
+ let b2 = uint32_batch(3..5); // 2 -> 3+2=5 > 4, emit 3 first
+ let b3 = uint32_batch(5..8); // 3 -> 2+3=5 > 4, emit 2 first
+ let b4 = uint32_batch(8..10); // 2 -> 3+2=5 > 4, emit 3 first
let schema = Arc::clone(&b1.schema());
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema),
4).with_exact_size(false);
+ // Push first batch (3 rows) -> buffered
coalescer.push_batch(b1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
+ // Push second batch (2 rows) -> 3+2=5 > 4, emit buffered 3 rows
first
coalescer.push_batch(b2).unwrap();
let out1 = coalescer
.next_completed_batch()
.expect("expected first batch");
- assert_eq!(out1.num_rows(), 5);
- assert_eq!(normalize_batch(out1),
normalize_batch(uint32_batch(0..5)));
+ assert_eq!(out1.num_rows(), 3);
+ assert_eq!(normalize_batch(out1),
normalize_batch(uint32_batch(0..3)));
+ // Now 2 rows from b2 are buffered, push b3 (3 rows) -> 2+3=5 > 4,
emit 2 rows first
coalescer.push_batch(b3).unwrap();
- assert!(coalescer.next_completed_batch().is_none());
-
- coalescer.push_batch(b4).unwrap();
let out2 = coalescer
.next_completed_batch()
.expect("expected second batch");
- assert_eq!(out2.num_rows(), 5);
- assert_eq!(normalize_batch(out2),
normalize_batch(uint32_batch(5..10)));
+ assert_eq!(out2.num_rows(), 2);
+ assert_eq!(normalize_batch(out2),
normalize_batch(uint32_batch(3..5)));
+
+ // Now 3 rows from b3 are buffered, push b4 (2 rows) -> 3+2=5 > 4,
emit 3 rows first
+ coalescer.push_batch(b4).unwrap();
+ let out3 = coalescer
+ .next_completed_batch()
+ .expect("expected third batch");
+ assert_eq!(out3.num_rows(), 3);
+ assert_eq!(normalize_batch(out3),
normalize_batch(uint32_batch(5..8)));
+
+ // Finish to get remaining 2 rows from b4
+ coalescer.finish_buffered_batch().unwrap();
+ let out4 = coalescer
+ .next_completed_batch()
+ .expect("expected fourth batch");
+ assert_eq!(out4.num_rows(), 2);
+ assert_eq!(normalize_batch(out4),
normalize_batch(uint32_batch(8..10)));
}
#[test]
fn test_non_strict_large_then_more_outputs() {
- // first push a large batch (should produce one big output), then
push more small ones to produce another
+ // first push a large batch (should be split), then push more small
ones
let big = uint32_batch(0..5000);
let small1 = uint32_batch(5000..5002); // 2
- let small2 = uint32_batch(5002..5005); // 3 -> 2+3=5 >=4 emit
+ let small2 = uint32_batch(5002..5005); // 3 -> 2+3=5 > 4, emit 2
first
let schema = Arc::clone(&big.schema());
- // Use small target (4) so that small1 + small2 will trigger an emit
let mut coalescer = BatchCoalescer::new(Arc::clone(&schema),
4).with_exact_size(false);
- // push big: non-strict mode should emit the whole big batch (5000
rows)
+ // push big: should be split into chunks of 4
+ // 5000 / 4 = 1250 full batches
coalescer.push_batch(big).unwrap();
- let out_big = coalescer
- .next_completed_batch()
- .expect("expected big batch");
- assert_eq!(out_big.num_rows(), 5000);
- assert_eq!(
- normalize_batch(out_big),
- normalize_batch(uint32_batch(0..5000))
- );
- // push small1 (2 rows) -> not enough yet
+ let mut big_outputs = vec![];
+ while let Some(b) = coalescer.next_completed_batch() {
+ big_outputs.push(b);
+ }
+
+ assert_eq!(big_outputs.len(), 1250); // 1250 batches of 4 rows each
+ for (i, out) in big_outputs.iter().enumerate() {
+ assert_eq!(out.num_rows(), 4);
+ let start = i * 4;
+ let end = (i + 1) * 4;
+ let expected = uint32_batch(start as u32..end as u32);
+ assert_eq!(normalize_batch(out.clone()),
normalize_batch(expected));
+ }
+
+ // push small1 (2 rows) -> buffered
coalescer.push_batch(small1).unwrap();
assert!(coalescer.next_completed_batch().is_none());
- // push small2 (3 rows) -> now buffered = 2 + 3 = 5 >= 4,
non-strict emits all 5 rows
+ // push small2 (3 rows) -> 2+3=5 > 4, emit buffered 2 rows first
coalescer.push_batch(small2).unwrap();
- let out_small = coalescer
+ let out_small1 = coalescer
+ .next_completed_batch()
+ .expect("expected small batch 1");
+ assert_eq!(out_small1.num_rows(), 2);
+ assert_eq!(
+ normalize_batch(out_small1),
+ normalize_batch(uint32_batch(5000..5002))
+ );
+
+ // Finish to get remaining 3 rows from small2
+ coalescer.finish_buffered_batch().unwrap();
+ let out_small2 = coalescer
.next_completed_batch()
- .expect("expected small batch");
- assert_eq!(out_small.num_rows(), 5);
+ .expect("expected small batch 2");
+ assert_eq!(out_small2.num_rows(), 3);
assert_eq!(
- normalize_batch(out_small),
- normalize_batch(uint32_batch(5000..5005))
+ normalize_batch(out_small2),
+ normalize_batch(uint32_batch(5002..5005))
);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]