This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new e0f9382ea5 feat: support push batch direct to completed and add biggest coalesce batch support (#8146) e0f9382ea5 is described below commit e0f9382ea593e0884ba02bed5ef07e78a1fd8fc5 Author: Qi Zhu <821684...@qq.com> AuthorDate: Tue Aug 19 19:29:28 2025 +0800 feat: support push batch direct to completed and add biggest coalesce batch support (#8146) # Which issue does this PR close? needed for: https://github.com/apache/datafusion/pull/17193 # Rationale for this change ```rust // Large batch bypass optimization: // When biggest_coalesce_batch_size is configured and a batch exceeds this limit, // we can avoid expensive split-and-merge operations by passing it through directly. // // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)). // If not set (None), ALL batches follow normal coalescing behavior regardless of size. // ============================================================================= // CASE 1: No buffer + large batch → Direct bypass // ============================================================================= // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)): // Input sequence: [600, 1200, 300] // // With biggest_coalesce_batch_size=Some(500) (optimization enabled): // 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass // → output: [600] (bypass, preserves large batch) // 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass // → output: [1200] (bypass, preserves large batch) // 300 → normal batch, buffer: [300] // Result: [600], [1200], [300] - large batches preserved, mixed sizes // ============================================================================= // CASE 2: Buffer too large + large batch → Flush first, then bypass // ============================================================================= // This case prevents creating extremely large merged batches that would // significantly exceed both target_batch_size and biggest_coalesce_batch_size. // // Example 1: Buffer exceeds limit before large batch arrives // target_batch_size=1000, biggest_coalesce_batch_size=Some(400) // Input: [350, 200, 800] // // Step 1: push_batch([350]) // → batch_size=350 <= 400, normal path // → buffer: [350], buffered_rows=350 // // Step 2: push_batch([200]) // → batch_size=200 <= 400, normal path // → buffer: [350, 200], buffered_rows=550 // // Step 3: push_batch([800]) // → batch_size=800 > 400, large batch path // → buffered_rows=550 > 400 → Case 2: flush first // → flush: output [550] (combined [350, 200]) // → then bypass: output [800] // Result: [550], [800] - buffer flushed to prevent oversized merge // // Example 2: Multiple small batches accumulate before large batch // target_batch_size=1000, biggest_coalesce_batch_size=Some(300) // Input: [150, 100, 80, 900] // // Step 1-3: Accumulate small batches // 150 → buffer: [150], buffered_rows=150 // 100 → buffer: [150, 100], buffered_rows=250 // 80 → buffer: [150, 100, 80], buffered_rows=330 // // Step 4: push_batch([900]) // → batch_size=900 > 300, large batch path // → buffered_rows=330 > 300 → Case 2: flush first // → flush: output [330] (combined [150, 100, 80]) // → then bypass: output [900] // Result: [330], [900] - prevents merge into [1230] which would be too large // ============================================================================= // CASE 3: Small buffer + large batch → Normal coalescing (no bypass) // ============================================================================= // When buffer is small enough, we still merge to maintain efficiency // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500) // Input: [300, 1200] // // Step 1: push_batch([300]) // → batch_size=300 <= 500, normal path // → buffer: [300], buffered_rows=300 // // Step 2: push_batch([1200]) // → batch_size=1200 > 500, large batch path // → buffered_rows=300 <= 500 → Case 3: normal merge // → buffer: [300, 1200] (1500 total) // → 1500 > target_batch_size → split: output [1000], buffer [500] // Result: [1000], [500] - normal split/merge behavior maintained // ============================================================================= // Comparison: Default vs Optimized Behavior // ============================================================================= // target_batch_size=1000, biggest_coalesce_batch_size=Some(500) // Input: [600, 1200, 300] // // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None): // 600 → buffer: [600] // 1200 → buffer: [600, 1200] (1800 rows total) // → split: output [1000 rows], buffer [800 rows remaining] // 300 → buffer: [800, 300] (1100 rows total) // → split: output [1000 rows], buffer [100 rows remaining] // Result: [1000], [1000], [100] - all outputs respect target_batch_size // // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)): // 600 → Case 1: direct bypass → output: [600] // 1200 → Case 1: direct bypass → output: [1200] // 300 → normal path → buffer: [300] // Result: [600], [1200], [300] - large batches preserved // ============================================================================= // Benefits and Trade-offs // ============================================================================= // Benefits of the optimization: // - Large batches stay intact (better for downstream vectorized processing) // - Fewer split/merge operations (better CPU performance) // - More predictable memory usage patterns // - Maintains streaming efficiency while preserving batch boundaries // // Trade-offs: // - Output batch sizes become variable (not always target_batch_size) // - May produce smaller partial batches when flushing before large batches // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance // TODO, for unsorted batches, we may can filter all large batches, and coalesce all // small batches together? ``` # What changes are included in this PR? Add more public API which is needed for apache datafusion. # Are these changes tested? yes Added unit test. # Are there any user-facing changes? No --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- arrow-select/src/coalesce.rs | 626 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 623 insertions(+), 3 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 891d62fc3a..3ae31612c9 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -142,6 +142,8 @@ pub struct BatchCoalescer { buffered_rows: usize, /// Completed batches completed: VecDeque<RecordBatch>, + /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`] + biggest_coalesce_batch_size: Option<usize>, } impl BatchCoalescer { @@ -166,9 +168,41 @@ impl BatchCoalescer { // We will for sure store at least one completed batch completed: VecDeque::with_capacity(1), buffered_rows: 0, + biggest_coalesce_batch_size: None, } } + /// Set the coalesce batch size limit (default `None`) + /// + /// This limit determine when batches should bypass coalescing. Intuitively, + /// batches that are already large are costly to coalesce and are efficient + /// enough to process directly without coalescing. + /// + /// If `Some(limit)`, batches larger than this limit will bypass coalescing + /// when there is no buffered data, or when the previously buffered data + /// already exceeds this limit. + /// + /// If `None`, all batches will be coalesced according to the + /// target_batch_size. + pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self { + self.biggest_coalesce_batch_size = limit; + self + } + + /// Get the current biggest coalesce batch size limit + /// + /// See [`Self::with_biggest_coalesce_batch_size`] for details + pub fn biggest_coalesce_batch_size(&self) -> Option<usize> { + self.biggest_coalesce_batch_size + } + + /// Set the biggest coalesce batch size limit + /// + /// See [`Self::with_biggest_coalesce_batch_size`] for details + pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) { + self.biggest_coalesce_batch_size = limit; + } + /// Return the schema of the output batches pub fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) @@ -236,11 +270,160 @@ impl BatchCoalescer { /// assert_eq!(completed_batch, expected_batch); /// ``` pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { - let (_schema, arrays, mut num_rows) = batch.into_parts(); - if num_rows == 0 { + // Large batch bypass optimization: + // When biggest_coalesce_batch_size is configured and a batch exceeds this limit, + // we can avoid expensive split-and-merge operations by passing it through directly. + // + // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size + // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)). + // If not set (None), ALL batches follow normal coalescing behavior regardless of size. + + // ============================================================================= + // CASE 1: No buffer + large batch → Direct bypass + // ============================================================================= + // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)): + // Input sequence: [600, 1200, 300] + // + // With biggest_coalesce_batch_size=Some(500) (optimization enabled): + // 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass + // → output: [600] (bypass, preserves large batch) + // 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass + // → output: [1200] (bypass, preserves large batch) + // 300 → normal batch, buffer: [300] + // Result: [600], [1200], [300] - large batches preserved, mixed sizes + + // ============================================================================= + // CASE 2: Buffer too large + large batch → Flush first, then bypass + // ============================================================================= + // This case prevents creating extremely large merged batches that would + // significantly exceed both target_batch_size and biggest_coalesce_batch_size. + // + // Example 1: Buffer exceeds limit before large batch arrives + // target_batch_size=1000, biggest_coalesce_batch_size=Some(400) + // Input: [350, 200, 800] + // + // Step 1: push_batch([350]) + // → batch_size=350 <= 400, normal path + // → buffer: [350], buffered_rows=350 + // + // Step 2: push_batch([200]) + // → batch_size=200 <= 400, normal path + // → buffer: [350, 200], buffered_rows=550 + // + // Step 3: push_batch([800]) + // → batch_size=800 > 400, large batch path + // → buffered_rows=550 > 400 → Case 2: flush first + // → flush: output [550] (combined [350, 200]) + // → then bypass: output [800] + // Result: [550], [800] - buffer flushed to prevent oversized merge + // + // Example 2: Multiple small batches accumulate before large batch + // target_batch_size=1000, biggest_coalesce_batch_size=Some(300) + // Input: [150, 100, 80, 900] + // + // Step 1-3: Accumulate small batches + // 150 → buffer: [150], buffered_rows=150 + // 100 → buffer: [150, 100], buffered_rows=250 + // 80 → buffer: [150, 100, 80], buffered_rows=330 + // + // Step 4: push_batch([900]) + // → batch_size=900 > 300, large batch path + // → buffered_rows=330 > 300 → Case 2: flush first + // → flush: output [330] (combined [150, 100, 80]) + // → then bypass: output [900] + // Result: [330], [900] - prevents merge into [1230] which would be too large + + // ============================================================================= + // CASE 3: Small buffer + large batch → Normal coalescing (no bypass) + // ============================================================================= + // When buffer is small enough, we still merge to maintain efficiency + // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500) + // Input: [300, 1200] + // + // Step 1: push_batch([300]) + // → batch_size=300 <= 500, normal path + // → buffer: [300], buffered_rows=300 + // + // Step 2: push_batch([1200]) + // → batch_size=1200 > 500, large batch path + // → buffered_rows=300 <= 500 → Case 3: normal merge + // → buffer: [300, 1200] (1500 total) + // → 1500 > target_batch_size → split: output [1000], buffer [500] + // Result: [1000], [500] - normal split/merge behavior maintained + + // ============================================================================= + // Comparison: Default vs Optimized Behavior + // ============================================================================= + // target_batch_size=1000, biggest_coalesce_batch_size=Some(500) + // Input: [600, 1200, 300] + // + // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None): + // 600 → buffer: [600] + // 1200 → buffer: [600, 1200] (1800 rows total) + // → split: output [1000 rows], buffer [800 rows remaining] + // 300 → buffer: [800, 300] (1100 rows total) + // → split: output [1000 rows], buffer [100 rows remaining] + // Result: [1000], [1000], [100] - all outputs respect target_batch_size + // + // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)): + // 600 → Case 1: direct bypass → output: [600] + // 1200 → Case 1: direct bypass → output: [1200] + // 300 → normal path → buffer: [300] + // Result: [600], [1200], [300] - large batches preserved + + // ============================================================================= + // Benefits and Trade-offs + // ============================================================================= + // Benefits of the optimization: + // - Large batches stay intact (better for downstream vectorized processing) + // - Fewer split/merge operations (better CPU performance) + // - More predictable memory usage patterns + // - Maintains streaming efficiency while preserving batch boundaries + // + // Trade-offs: + // - Output batch sizes become variable (not always target_batch_size) + // - May produce smaller partial batches when flushing before large batches + // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance + + // TODO, for unsorted batches, we may can filter all large batches, and coalesce all + // small batches together? + + let batch_size = batch.num_rows(); + + // Fast path: skip empty batches + if batch_size == 0 { return Ok(()); } + // Large batch optimization: bypass coalescing for oversized batches + if let Some(limit) = self.biggest_coalesce_batch_size { + if batch_size > limit { + // Case 1: No buffered data - emit large batch directly + // Example: [] + [1200] → output [1200], buffer [] + if self.buffered_rows == 0 { + self.completed.push_back(batch); + return Ok(()); + } + + // Case 2: Buffer too large - flush then emit to avoid oversized merge + // Example: [850] + [1200] → output [850], then output [1200] + // This prevents creating batches much larger than both target_batch_size + // and biggest_coalesce_batch_size, which could cause memory issues + if self.buffered_rows > limit { + self.finish_buffered_batch()?; + self.completed.push_back(batch); + return Ok(()); + } + + // Case 3: Small buffer - proceed with normal coalescing + // Example: [300] + [1200] → split and merge normally + // This ensures small batches still get properly coalesced + // while allowing some controlled growth beyond the limit + } + } + + let (_schema, arrays, mut num_rows) = batch.into_parts(); + // setup input rows assert_eq!(arrays.len(), self.in_progress_arrays.len()); self.in_progress_arrays @@ -290,6 +473,11 @@ impl BatchCoalescer { Ok(()) } + /// Returns the number of buffered rows + pub fn get_buffered_rows(&self) -> usize { + self.buffered_rows + } + /// Concatenates any buffered batches into a single `RecordBatch` and /// clears any output buffers /// @@ -394,7 +582,7 @@ mod tests { use arrow_array::builder::StringViewBuilder; use arrow_array::cast::AsArray; use arrow_array::{ - BinaryViewArray, Int64Array, RecordBatchOptions, StringArray, StringViewArray, + BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray, TimestampNanosecondArray, UInt32Array, }; use arrow_schema::{DataType, Field, Schema}; @@ -1314,4 +1502,436 @@ mod tests { let options = RecordBatchOptions::new().with_row_count(Some(row_count)); RecordBatch::try_new_with_options(schema, columns, &options).unwrap() } + + /// Helper function to create a test batch with specified number of rows + fn create_test_batch(num_rows: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])); + let array = Int32Array::from_iter_values(0..num_rows as i32); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() + } + #[test] + fn test_biggest_coalesce_batch_size_none_default() { + // Test that default behavior (None) coalesces all batches + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + + // Push a large batch (1000 rows) - should be coalesced normally + let large_batch = create_test_batch(1000); + coalescer.push_batch(large_batch).unwrap(); + + // Should produce multiple batches of target size (100) + let mut output_batches = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + output_batches.push(batch); + } + + coalescer.finish_buffered_batch().unwrap(); + while let Some(batch) = coalescer.next_completed_batch() { + output_batches.push(batch); + } + + // Should have 10 batches of 100 rows each + assert_eq!(output_batches.len(), 10); + for batch in output_batches { + assert_eq!(batch.num_rows(), 100); + } + } + + #[test] + fn test_biggest_coalesce_batch_size_bypass_large_batch() { + // Test that batches larger than biggest_coalesce_batch_size bypass coalescing + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(500)); + + // Push a large batch (1000 rows) - should bypass coalescing + let large_batch = create_test_batch(1000); + coalescer.push_batch(large_batch.clone()).unwrap(); + + // Should have one completed batch immediately (the original large batch) + assert!(coalescer.has_completed_batch()); + let output_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(output_batch.num_rows(), 1000); + + // Should be no more completed batches + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_coalesce_small_batch() { + // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(500)); + + // Push small batches that should be coalesced + let small_batch = create_test_batch(50); + coalescer.push_batch(small_batch.clone()).unwrap(); + + // Should not have completed batch yet (only 50 rows, target is 100) + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 50); + + // Push another small batch + coalescer.push_batch(small_batch).unwrap(); + + // Now should have a completed batch (100 rows total) + assert!(coalescer.has_completed_batch()); + let output_batch = coalescer.next_completed_batch().unwrap(); + assert_eq!(output_batch.num_rows(), 100); + + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_equal_boundary() { + // Test behavior when batch size equals biggest_coalesce_batch_size + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(500)); + + // Push a batch exactly equal to the limit + let boundary_batch = create_test_batch(500); + coalescer.push_batch(boundary_batch).unwrap(); + + // Should be coalesced (not bypass) since it's equal, not greater + let mut output_count = 0; + while coalescer.next_completed_batch().is_some() { + output_count += 1; + } + + coalescer.finish_buffered_batch().unwrap(); + while coalescer.next_completed_batch().is_some() { + output_count += 1; + } + + // Should have 5 batches of 100 rows each + assert_eq!(output_count, 5); + } + + #[test] + fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() { + // Test the new consecutive large batch bypass behavior + // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass) + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(200)); + + let small_batch = create_test_batch(50); + + // Push small batch first to create buffered data + coalescer.push_batch(small_batch).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 50); + assert!(!coalescer.has_completed_batch()); + + // Push first large batch - should go through normal coalescing due to buffered data + let large_batch1 = create_test_batch(250); + coalescer.push_batch(large_batch1).unwrap(); + + // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered + let mut completed_batches = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + completed_batches.push(batch); + } + assert_eq!(completed_batches.len(), 3); + assert_eq!(coalescer.get_buffered_rows(), 0); + + // Now push consecutive large batches - they should bypass + let large_batch2 = create_test_batch(300); + let large_batch3 = create_test_batch(400); + + // Push second large batch - should bypass since it's consecutive and buffer is empty + coalescer.push_batch(large_batch2).unwrap(); + assert!(coalescer.has_completed_batch()); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 300); // bypassed with original size + assert_eq!(coalescer.get_buffered_rows(), 0); + + // Push third large batch - should also bypass + coalescer.push_batch(large_batch3).unwrap(); + assert!(coalescer.has_completed_batch()); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 400); // bypassed with original size + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_empty_batch() { + // Test that empty batches don't trigger the bypass logic + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(50)); + + let empty_batch = create_test_batch(0); + coalescer.push_batch(empty_batch).unwrap(); + + // Empty batch should be handled normally (no effect) + assert!(!coalescer.has_completed_batch()); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + + #[test] + fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() { + // Test that when there is buffered data, large batches do NOT bypass (unless consecutive) + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(200)); + + // Add some buffered data first + let small_batch = create_test_batch(30); + coalescer.push_batch(small_batch.clone()).unwrap(); + coalescer.push_batch(small_batch).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 60); + + // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0 + let large_batch = create_test_batch(250); + coalescer.push_batch(large_batch).unwrap(); + + // The large batch should be processed through normal coalescing logic + // Total: 60 (buffered) + 250 (new) = 310 rows + // Output: 3 complete batches of 100 rows each, 10 rows remain buffered + + let mut completed_batches = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + completed_batches.push(batch); + } + + assert_eq!(completed_batches.len(), 3); + for batch in &completed_batches { + assert_eq!(batch.num_rows(), 100); + } + assert_eq!(coalescer.get_buffered_rows(), 10); + } + + #[test] + fn test_biggest_coalesce_batch_size_zero_limit() { + // Test edge case where limit is 0 (all batches bypass when no buffered data) + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(0)); + + // Even a 1-row batch should bypass when there's no buffered data + let tiny_batch = create_test_batch(1); + coalescer.push_batch(tiny_batch).unwrap(); + + assert!(coalescer.has_completed_batch()); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 1); + } + + #[test] + fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() { + // Test that bypass only occurs when buffered_rows == 0 + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(200)); + + // First, push a large batch with no buffered data - should bypass + let large_batch = create_test_batch(300); + coalescer.push_batch(large_batch.clone()).unwrap(); + + assert!(coalescer.has_completed_batch()); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 300); // bypassed + assert_eq!(coalescer.get_buffered_rows(), 0); + + // Now add some buffered data + let small_batch = create_test_batch(50); + coalescer.push_batch(small_batch).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 50); + + // Push the same large batch again - should NOT bypass this time (not consecutive) + coalescer.push_batch(large_batch).unwrap(); + + // Should process through normal coalescing: 50 + 300 = 350 rows + // Output: 3 complete batches of 100 rows, 50 rows buffered + let mut completed_batches = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + completed_batches.push(batch); + } + + assert_eq!(completed_batches.len(), 3); + for batch in &completed_batches { + assert_eq!(batch.num_rows(), 100); + } + assert_eq!(coalescer.get_buffered_rows(), 50); + } + + #[test] + fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() { + // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600 + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 1000, + ); + coalescer.set_biggest_coalesce_batch_size(Some(500)); + + // Push small batches first + coalescer.push_batch(create_test_batch(20)).unwrap(); + coalescer.push_batch(create_test_batch(20)).unwrap(); + coalescer.push_batch(create_test_batch(30)).unwrap(); + + assert_eq!(coalescer.get_buffered_rows(), 70); + assert!(!coalescer.has_completed_batch()); + + // Push first large batch (700) - should coalesce due to buffered data + coalescer.push_batch(create_test_batch(700)).unwrap(); + + // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered + assert_eq!(coalescer.get_buffered_rows(), 770); + assert!(!coalescer.has_completed_batch()); + + // Push second large batch (600) - should bypass since previous was large + coalescer.push_batch(create_test_batch(600)).unwrap(); + + // Should flush buffer (770 rows) and bypass the 600 + let mut outputs = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + outputs.push(batch); + } + assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600) + assert_eq!(outputs[0].num_rows(), 770); + assert_eq!(outputs[1].num_rows(), 600); + assert_eq!(coalescer.get_buffered_rows(), 0); + + // Push remaining large batches - should all bypass + let remaining_batches = [700, 900, 700, 600]; + for &size in &remaining_batches { + coalescer.push_batch(create_test_batch(size)).unwrap(); + + assert!(coalescer.has_completed_batch()); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), size); + assert_eq!(coalescer.get_buffered_rows(), 0); + } + } + + #[test] + fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() { + // Test truly consecutive large batches that should all bypass + // This test ensures buffer is completely empty between large batches + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(200)); + + // Push consecutive large batches with no prior buffered data + let large_batches = vec![ + create_test_batch(300), + create_test_batch(400), + create_test_batch(350), + create_test_batch(500), + ]; + + let mut all_outputs = vec![]; + + for (i, large_batch) in large_batches.into_iter().enumerate() { + let expected_size = large_batch.num_rows(); + + // Buffer should be empty before each large batch + assert_eq!( + coalescer.get_buffered_rows(), + 0, + "Buffer should be empty before batch {}", + i + ); + + coalescer.push_batch(large_batch).unwrap(); + + // Each large batch should bypass and produce exactly one output batch + assert!( + coalescer.has_completed_batch(), + "Should have completed batch after pushing batch {}", + i + ); + + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!( + output.num_rows(), + expected_size, + "Batch {} should have bypassed with original size", + i + ); + + // Should be no more batches and buffer should be empty + assert!( + !coalescer.has_completed_batch(), + "Should have no more completed batches after batch {}", + i + ); + assert_eq!( + coalescer.get_buffered_rows(), + 0, + "Buffer should be empty after batch {}", + i + ); + + all_outputs.push(output); + } + + // Verify we got exactly 4 output batches with original sizes + assert_eq!(all_outputs.len(), 4); + assert_eq!(all_outputs[0].num_rows(), 300); + assert_eq!(all_outputs[1].num_rows(), 400); + assert_eq!(all_outputs[2].num_rows(), 350); + assert_eq!(all_outputs[3].num_rows(), 500); + } + + #[test] + fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() { + // Test that small batches reset the consecutive large batch tracking + let mut coalescer = BatchCoalescer::new( + Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])), + 100, + ); + coalescer.set_biggest_coalesce_batch_size(Some(200)); + + // Push first large batch - should bypass (no buffered data) + coalescer.push_batch(create_test_batch(300)).unwrap(); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 300); + + // Push second large batch - should bypass (consecutive) + coalescer.push_batch(create_test_batch(400)).unwrap(); + let output = coalescer.next_completed_batch().unwrap(); + assert_eq!(output.num_rows(), 400); + + // Push small batch - resets consecutive tracking + coalescer.push_batch(create_test_batch(50)).unwrap(); + assert_eq!(coalescer.get_buffered_rows(), 50); + + // Push large batch again - should NOT bypass due to buffered data + coalescer.push_batch(create_test_batch(350)).unwrap(); + + // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100 + let mut outputs = vec![]; + while let Some(batch) = coalescer.next_completed_batch() { + outputs.push(batch); + } + assert_eq!(outputs.len(), 4); + for batch in outputs { + assert_eq!(batch.num_rows(), 100); + } + assert_eq!(coalescer.get_buffered_rows(), 0); + } }