This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e0f9382ea5 feat: support push batch direct to completed and add 
biggest coalesce batch support (#8146)
e0f9382ea5 is described below

commit e0f9382ea593e0884ba02bed5ef07e78a1fd8fc5
Author: Qi Zhu <821684...@qq.com>
AuthorDate: Tue Aug 19 19:29:28 2025 +0800

    feat: support push batch direct to completed and add biggest coalesce batch 
support (#8146)
    
    # Which issue does this PR close?
    
    needed for:
    https://github.com/apache/datafusion/pull/17193
    
    # Rationale for this change
    ```rust
            // Large batch bypass optimization:
            // When biggest_coalesce_batch_size is configured and a batch 
exceeds this limit,
            // we can avoid expensive split-and-merge operations by passing it 
through directly.
            //
            // IMPORTANT: This optimization is OPTIONAL and only active when 
biggest_coalesce_batch_size
            // is explicitly set via 
with_biggest_coalesce_batch_size(Some(limit)).
            // If not set (None), ALL batches follow normal coalescing behavior 
regardless of size.
    
            // 
=============================================================================
            // CASE 1: No buffer + large batch → Direct bypass
            // 
=============================================================================
            // Example scenario (target_batch_size=1000, 
biggest_coalesce_batch_size=Some(500)):
            // Input sequence: [600, 1200, 300]
            //
            // With biggest_coalesce_batch_size=Some(500) (optimization 
enabled):
            //   600 → large batch detected! buffered_rows=0 → Case 1: direct 
bypass
            //        → output: [600] (bypass, preserves large batch)
            //   1200 → large batch detected! buffered_rows=0 → Case 1: direct 
bypass
            //         → output: [1200] (bypass, preserves large batch)
            //   300 → normal batch, buffer: [300]
            //   Result: [600], [1200], [300] - large batches preserved, mixed 
sizes
    
            // 
=============================================================================
            // CASE 2: Buffer too large + large batch → Flush first, then bypass
            // 
=============================================================================
            // This case prevents creating extremely large merged batches that 
would
            // significantly exceed both target_batch_size and 
biggest_coalesce_batch_size.
            //
            // Example 1: Buffer exceeds limit before large batch arrives
            // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
            // Input: [350, 200, 800]
            //
            // Step 1: push_batch([350])
            //   → batch_size=350 <= 400, normal path
            //   → buffer: [350], buffered_rows=350
            //
            // Step 2: push_batch([200])
            //   → batch_size=200 <= 400, normal path
            //   → buffer: [350, 200], buffered_rows=550
            //
            // Step 3: push_batch([800])
            //   → batch_size=800 > 400, large batch path
            //   → buffered_rows=550 > 400 → Case 2: flush first
            //   → flush: output [550] (combined [350, 200])
            //   → then bypass: output [800]
            //   Result: [550], [800] - buffer flushed to prevent oversized 
merge
            //
            // Example 2: Multiple small batches accumulate before large batch
            // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
            // Input: [150, 100, 80, 900]
            //
            // Step 1-3: Accumulate small batches
            //   150 → buffer: [150], buffered_rows=150
            //   100 → buffer: [150, 100], buffered_rows=250
            //   80  → buffer: [150, 100, 80], buffered_rows=330
            //
            // Step 4: push_batch([900])
            //   → batch_size=900 > 300, large batch path
            //   → buffered_rows=330 > 300 → Case 2: flush first
            //   → flush: output [330] (combined [150, 100, 80])
            //   → then bypass: output [900]
            //   Result: [330], [900] - prevents merge into [1230] which would 
be too large
    
            // 
=============================================================================
            // CASE 3: Small buffer + large batch → Normal coalescing (no 
bypass)
            // 
=============================================================================
            // When buffer is small enough, we still merge to maintain 
efficiency
            // Example: target_batch_size=1000, 
biggest_coalesce_batch_size=Some(500)
            // Input: [300, 1200]
            //
            // Step 1: push_batch([300])
            //   → batch_size=300 <= 500, normal path
            //   → buffer: [300], buffered_rows=300
            //
            // Step 2: push_batch([1200])
            //   → batch_size=1200 > 500, large batch path
            //   → buffered_rows=300 <= 500 → Case 3: normal merge
            //   → buffer: [300, 1200] (1500 total)
            //   → 1500 > target_batch_size → split: output [1000], buffer [500]
            //   Result: [1000], [500] - normal split/merge behavior maintained
    
            // 
=============================================================================
            // Comparison: Default vs Optimized Behavior
            // 
=============================================================================
            // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
            // Input: [600, 1200, 300]
            //
            // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
            //   600 → buffer: [600]
            //   1200 → buffer: [600, 1200] (1800 rows total)
            //         → split: output [1000 rows], buffer [800 rows remaining]
            //   300 → buffer: [800, 300] (1100 rows total)
            //        → split: output [1000 rows], buffer [100 rows remaining]
            //   Result: [1000], [1000], [100] - all outputs respect 
target_batch_size
            //
            // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
            //   600 → Case 1: direct bypass → output: [600]
            //   1200 → Case 1: direct bypass → output: [1200]
            //   300 → normal path → buffer: [300]
            //   Result: [600], [1200], [300] - large batches preserved
    
            // 
=============================================================================
            // Benefits and Trade-offs
            // 
=============================================================================
            // Benefits of the optimization:
            // - Large batches stay intact (better for downstream vectorized 
processing)
            // - Fewer split/merge operations (better CPU performance)
            // - More predictable memory usage patterns
            // - Maintains streaming efficiency while preserving batch 
boundaries
            //
            // Trade-offs:
            // - Output batch sizes become variable (not always 
target_batch_size)
            // - May produce smaller partial batches when flushing before large 
batches
            // - Requires tuning biggest_coalesce_batch_size parameter for 
optimal performance
    
            // TODO, for unsorted batches, we may can filter all large batches, 
and coalesce all
            // small batches together?
    ```
    
    # What changes are included in this PR?
    
    Add more public API which is needed for apache datafusion.
    
    # Are these changes tested?
    
    yes
    
    Added unit test.
    # Are there any user-facing changes?
    
    No
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 arrow-select/src/coalesce.rs | 626 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 623 insertions(+), 3 deletions(-)

diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs
index 891d62fc3a..3ae31612c9 100644
--- a/arrow-select/src/coalesce.rs
+++ b/arrow-select/src/coalesce.rs
@@ -142,6 +142,8 @@ pub struct BatchCoalescer {
     buffered_rows: usize,
     /// Completed batches
     completed: VecDeque<RecordBatch>,
+    /// Biggest coalesce batch size. See 
[`Self::with_biggest_coalesce_batch_size`]
+    biggest_coalesce_batch_size: Option<usize>,
 }
 
 impl BatchCoalescer {
@@ -166,9 +168,41 @@ impl BatchCoalescer {
             // We will for sure store at least one completed batch
             completed: VecDeque::with_capacity(1),
             buffered_rows: 0,
+            biggest_coalesce_batch_size: None,
         }
     }
 
+    /// Set the coalesce batch size limit (default `None`)
+    ///
+    /// This limit determine when batches should bypass coalescing. 
Intuitively,
+    /// batches that are already large are costly to coalesce and are efficient
+    /// enough to process directly without coalescing.
+    ///
+    /// If `Some(limit)`, batches larger than this limit will bypass coalescing
+    /// when there is no buffered data, or when the previously buffered data
+    /// already exceeds this limit.
+    ///
+    /// If `None`, all batches will be coalesced according to the
+    /// target_batch_size.
+    pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> 
Self {
+        self.biggest_coalesce_batch_size = limit;
+        self
+    }
+
+    /// Get the current biggest coalesce batch size limit
+    ///
+    /// See [`Self::with_biggest_coalesce_batch_size`] for details
+    pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
+        self.biggest_coalesce_batch_size
+    }
+
+    /// Set the biggest coalesce batch size limit
+    ///
+    /// See [`Self::with_biggest_coalesce_batch_size`] for details
+    pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
+        self.biggest_coalesce_batch_size = limit;
+    }
+
     /// Return the schema of the output batches
     pub fn schema(&self) -> SchemaRef {
         Arc::clone(&self.schema)
@@ -236,11 +270,160 @@ impl BatchCoalescer {
     /// assert_eq!(completed_batch, expected_batch);
     /// ```
     pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> 
{
-        let (_schema, arrays, mut num_rows) = batch.into_parts();
-        if num_rows == 0 {
+        // Large batch bypass optimization:
+        // When biggest_coalesce_batch_size is configured and a batch exceeds 
this limit,
+        // we can avoid expensive split-and-merge operations by passing it 
through directly.
+        //
+        // IMPORTANT: This optimization is OPTIONAL and only active when 
biggest_coalesce_batch_size
+        // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
+        // If not set (None), ALL batches follow normal coalescing behavior 
regardless of size.
+
+        // 
=============================================================================
+        // CASE 1: No buffer + large batch → Direct bypass
+        // 
=============================================================================
+        // Example scenario (target_batch_size=1000, 
biggest_coalesce_batch_size=Some(500)):
+        // Input sequence: [600, 1200, 300]
+        //
+        // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
+        //   600 → large batch detected! buffered_rows=0 → Case 1: direct 
bypass
+        //        → output: [600] (bypass, preserves large batch)
+        //   1200 → large batch detected! buffered_rows=0 → Case 1: direct 
bypass
+        //         → output: [1200] (bypass, preserves large batch)
+        //   300 → normal batch, buffer: [300]
+        //   Result: [600], [1200], [300] - large batches preserved, mixed 
sizes
+
+        // 
=============================================================================
+        // CASE 2: Buffer too large + large batch → Flush first, then bypass
+        // 
=============================================================================
+        // This case prevents creating extremely large merged batches that 
would
+        // significantly exceed both target_batch_size and 
biggest_coalesce_batch_size.
+        //
+        // Example 1: Buffer exceeds limit before large batch arrives
+        // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
+        // Input: [350, 200, 800]
+        //
+        // Step 1: push_batch([350])
+        //   → batch_size=350 <= 400, normal path
+        //   → buffer: [350], buffered_rows=350
+        //
+        // Step 2: push_batch([200])
+        //   → batch_size=200 <= 400, normal path
+        //   → buffer: [350, 200], buffered_rows=550
+        //
+        // Step 3: push_batch([800])
+        //   → batch_size=800 > 400, large batch path
+        //   → buffered_rows=550 > 400 → Case 2: flush first
+        //   → flush: output [550] (combined [350, 200])
+        //   → then bypass: output [800]
+        //   Result: [550], [800] - buffer flushed to prevent oversized merge
+        //
+        // Example 2: Multiple small batches accumulate before large batch
+        // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
+        // Input: [150, 100, 80, 900]
+        //
+        // Step 1-3: Accumulate small batches
+        //   150 → buffer: [150], buffered_rows=150
+        //   100 → buffer: [150, 100], buffered_rows=250
+        //   80  → buffer: [150, 100, 80], buffered_rows=330
+        //
+        // Step 4: push_batch([900])
+        //   → batch_size=900 > 300, large batch path
+        //   → buffered_rows=330 > 300 → Case 2: flush first
+        //   → flush: output [330] (combined [150, 100, 80])
+        //   → then bypass: output [900]
+        //   Result: [330], [900] - prevents merge into [1230] which would be 
too large
+
+        // 
=============================================================================
+        // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
+        // 
=============================================================================
+        // When buffer is small enough, we still merge to maintain efficiency
+        // Example: target_batch_size=1000, 
biggest_coalesce_batch_size=Some(500)
+        // Input: [300, 1200]
+        //
+        // Step 1: push_batch([300])
+        //   → batch_size=300 <= 500, normal path
+        //   → buffer: [300], buffered_rows=300
+        //
+        // Step 2: push_batch([1200])
+        //   → batch_size=1200 > 500, large batch path
+        //   → buffered_rows=300 <= 500 → Case 3: normal merge
+        //   → buffer: [300, 1200] (1500 total)
+        //   → 1500 > target_batch_size → split: output [1000], buffer [500]
+        //   Result: [1000], [500] - normal split/merge behavior maintained
+
+        // 
=============================================================================
+        // Comparison: Default vs Optimized Behavior
+        // 
=============================================================================
+        // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
+        // Input: [600, 1200, 300]
+        //
+        // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
+        //   600 → buffer: [600]
+        //   1200 → buffer: [600, 1200] (1800 rows total)
+        //         → split: output [1000 rows], buffer [800 rows remaining]
+        //   300 → buffer: [800, 300] (1100 rows total)
+        //        → split: output [1000 rows], buffer [100 rows remaining]
+        //   Result: [1000], [1000], [100] - all outputs respect 
target_batch_size
+        //
+        // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
+        //   600 → Case 1: direct bypass → output: [600]
+        //   1200 → Case 1: direct bypass → output: [1200]
+        //   300 → normal path → buffer: [300]
+        //   Result: [600], [1200], [300] - large batches preserved
+
+        // 
=============================================================================
+        // Benefits and Trade-offs
+        // 
=============================================================================
+        // Benefits of the optimization:
+        // - Large batches stay intact (better for downstream vectorized 
processing)
+        // - Fewer split/merge operations (better CPU performance)
+        // - More predictable memory usage patterns
+        // - Maintains streaming efficiency while preserving batch boundaries
+        //
+        // Trade-offs:
+        // - Output batch sizes become variable (not always target_batch_size)
+        // - May produce smaller partial batches when flushing before large 
batches
+        // - Requires tuning biggest_coalesce_batch_size parameter for optimal 
performance
+
+        // TODO, for unsorted batches, we may can filter all large batches, 
and coalesce all
+        // small batches together?
+
+        let batch_size = batch.num_rows();
+
+        // Fast path: skip empty batches
+        if batch_size == 0 {
             return Ok(());
         }
 
+        // Large batch optimization: bypass coalescing for oversized batches
+        if let Some(limit) = self.biggest_coalesce_batch_size {
+            if batch_size > limit {
+                // Case 1: No buffered data - emit large batch directly
+                // Example: [] + [1200] → output [1200], buffer []
+                if self.buffered_rows == 0 {
+                    self.completed.push_back(batch);
+                    return Ok(());
+                }
+
+                // Case 2: Buffer too large - flush then emit to avoid 
oversized merge
+                // Example: [850] + [1200] → output [850], then output [1200]
+                // This prevents creating batches much larger than both 
target_batch_size
+                // and biggest_coalesce_batch_size, which could cause memory 
issues
+                if self.buffered_rows > limit {
+                    self.finish_buffered_batch()?;
+                    self.completed.push_back(batch);
+                    return Ok(());
+                }
+
+                // Case 3: Small buffer - proceed with normal coalescing
+                // Example: [300] + [1200] → split and merge normally
+                // This ensures small batches still get properly coalesced
+                // while allowing some controlled growth beyond the limit
+            }
+        }
+
+        let (_schema, arrays, mut num_rows) = batch.into_parts();
+
         // setup input rows
         assert_eq!(arrays.len(), self.in_progress_arrays.len());
         self.in_progress_arrays
@@ -290,6 +473,11 @@ impl BatchCoalescer {
         Ok(())
     }
 
+    /// Returns the number of buffered rows
+    pub fn get_buffered_rows(&self) -> usize {
+        self.buffered_rows
+    }
+
     /// Concatenates any buffered batches into a single `RecordBatch` and
     /// clears any output buffers
     ///
@@ -394,7 +582,7 @@ mod tests {
     use arrow_array::builder::StringViewBuilder;
     use arrow_array::cast::AsArray;
     use arrow_array::{
-        BinaryViewArray, Int64Array, RecordBatchOptions, StringArray, 
StringViewArray,
+        BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, 
StringArray, StringViewArray,
         TimestampNanosecondArray, UInt32Array,
     };
     use arrow_schema::{DataType, Field, Schema};
@@ -1314,4 +1502,436 @@ mod tests {
         let options = 
RecordBatchOptions::new().with_row_count(Some(row_count));
         RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
     }
+
+    /// Helper function to create a test batch with specified number of rows
+    fn create_test_batch(num_rows: usize) -> RecordBatch {
+        let schema = Arc::new(Schema::new(vec![Field::new("c0", 
DataType::Int32, false)]));
+        let array = Int32Array::from_iter_values(0..num_rows as i32);
+        RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+    }
+    #[test]
+    fn test_biggest_coalesce_batch_size_none_default() {
+        // Test that default behavior (None) coalesces all batches
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+
+        // Push a large batch (1000 rows) - should be coalesced normally
+        let large_batch = create_test_batch(1000);
+        coalescer.push_batch(large_batch).unwrap();
+
+        // Should produce multiple batches of target size (100)
+        let mut output_batches = vec![];
+        while let Some(batch) = coalescer.next_completed_batch() {
+            output_batches.push(batch);
+        }
+
+        coalescer.finish_buffered_batch().unwrap();
+        while let Some(batch) = coalescer.next_completed_batch() {
+            output_batches.push(batch);
+        }
+
+        // Should have 10 batches of 100 rows each
+        assert_eq!(output_batches.len(), 10);
+        for batch in output_batches {
+            assert_eq!(batch.num_rows(), 100);
+        }
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_bypass_large_batch() {
+        // Test that batches larger than biggest_coalesce_batch_size bypass 
coalescing
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(500));
+
+        // Push a large batch (1000 rows) - should bypass coalescing
+        let large_batch = create_test_batch(1000);
+        coalescer.push_batch(large_batch.clone()).unwrap();
+
+        // Should have one completed batch immediately (the original large 
batch)
+        assert!(coalescer.has_completed_batch());
+        let output_batch = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output_batch.num_rows(), 1000);
+
+        // Should be no more completed batches
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
+        // Test that batches smaller than biggest_coalesce_batch_size are 
coalesced normally
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(500));
+
+        // Push small batches that should be coalesced
+        let small_batch = create_test_batch(50);
+        coalescer.push_batch(small_batch.clone()).unwrap();
+
+        // Should not have completed batch yet (only 50 rows, target is 100)
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+
+        // Push another small batch
+        coalescer.push_batch(small_batch).unwrap();
+
+        // Now should have a completed batch (100 rows total)
+        assert!(coalescer.has_completed_batch());
+        let output_batch = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output_batch.num_rows(), 100);
+
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_equal_boundary() {
+        // Test behavior when batch size equals biggest_coalesce_batch_size
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(500));
+
+        // Push a batch exactly equal to the limit
+        let boundary_batch = create_test_batch(500);
+        coalescer.push_batch(boundary_batch).unwrap();
+
+        // Should be coalesced (not bypass) since it's equal, not greater
+        let mut output_count = 0;
+        while coalescer.next_completed_batch().is_some() {
+            output_count += 1;
+        }
+
+        coalescer.finish_buffered_batch().unwrap();
+        while coalescer.next_completed_batch().is_some() {
+            output_count += 1;
+        }
+
+        // Should have 5 batches of 100 rows each
+        assert_eq!(output_count, 5);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
+        // Test the new consecutive large batch bypass behavior
+        // Pattern: small batches -> first large batch (coalesced) -> 
consecutive large batches (bypass)
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(200));
+
+        let small_batch = create_test_batch(50);
+
+        // Push small batch first to create buffered data
+        coalescer.push_batch(small_batch).unwrap();
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+        assert!(!coalescer.has_completed_batch());
+
+        // Push first large batch - should go through normal coalescing due to 
buffered data
+        let large_batch1 = create_test_batch(250);
+        coalescer.push_batch(large_batch1).unwrap();
+
+        // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
+        let mut completed_batches = vec![];
+        while let Some(batch) = coalescer.next_completed_batch() {
+            completed_batches.push(batch);
+        }
+        assert_eq!(completed_batches.len(), 3);
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+
+        // Now push consecutive large batches - they should bypass
+        let large_batch2 = create_test_batch(300);
+        let large_batch3 = create_test_batch(400);
+
+        // Push second large batch - should bypass since it's consecutive and 
buffer is empty
+        coalescer.push_batch(large_batch2).unwrap();
+        assert!(coalescer.has_completed_batch());
+        let output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output.num_rows(), 300); // bypassed with original size
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+
+        // Push third large batch - should also bypass
+        coalescer.push_batch(large_batch3).unwrap();
+        assert!(coalescer.has_completed_batch());
+        let output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output.num_rows(), 400); // bypassed with original size
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_empty_batch() {
+        // Test that empty batches don't trigger the bypass logic
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(50));
+
+        let empty_batch = create_test_batch(0);
+        coalescer.push_batch(empty_batch).unwrap();
+
+        // Empty batch should be handled normally (no effect)
+        assert!(!coalescer.has_completed_batch());
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
+        // Test that when there is buffered data, large batches do NOT bypass 
(unless consecutive)
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(200));
+
+        // Add some buffered data first
+        let small_batch = create_test_batch(30);
+        coalescer.push_batch(small_batch.clone()).unwrap();
+        coalescer.push_batch(small_batch).unwrap();
+        assert_eq!(coalescer.get_buffered_rows(), 60);
+
+        // Push large batch that would normally bypass, but shouldn't because 
buffered_rows > 0
+        let large_batch = create_test_batch(250);
+        coalescer.push_batch(large_batch).unwrap();
+
+        // The large batch should be processed through normal coalescing logic
+        // Total: 60 (buffered) + 250 (new) = 310 rows
+        // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
+
+        let mut completed_batches = vec![];
+        while let Some(batch) = coalescer.next_completed_batch() {
+            completed_batches.push(batch);
+        }
+
+        assert_eq!(completed_batches.len(), 3);
+        for batch in &completed_batches {
+            assert_eq!(batch.num_rows(), 100);
+        }
+        assert_eq!(coalescer.get_buffered_rows(), 10);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_zero_limit() {
+        // Test edge case where limit is 0 (all batches bypass when no 
buffered data)
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(0));
+
+        // Even a 1-row batch should bypass when there's no buffered data
+        let tiny_batch = create_test_batch(1);
+        coalescer.push_batch(tiny_batch).unwrap();
+
+        assert!(coalescer.has_completed_batch());
+        let output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output.num_rows(), 1);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
+        // Test that bypass only occurs when buffered_rows == 0
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(200));
+
+        // First, push a large batch with no buffered data - should bypass
+        let large_batch = create_test_batch(300);
+        coalescer.push_batch(large_batch.clone()).unwrap();
+
+        assert!(coalescer.has_completed_batch());
+        let output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output.num_rows(), 300); // bypassed
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+
+        // Now add some buffered data
+        let small_batch = create_test_batch(50);
+        coalescer.push_batch(small_batch).unwrap();
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+
+        // Push the same large batch again - should NOT bypass this time (not 
consecutive)
+        coalescer.push_batch(large_batch).unwrap();
+
+        // Should process through normal coalescing: 50 + 300 = 350 rows
+        // Output: 3 complete batches of 100 rows, 50 rows buffered
+        let mut completed_batches = vec![];
+        while let Some(batch) = coalescer.next_completed_batch() {
+            completed_batches.push(batch);
+        }
+
+        assert_eq!(completed_batches.len(), 3);
+        for batch in &completed_batches {
+            assert_eq!(batch.num_rows(), 100);
+        }
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
+        // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            1000,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(500));
+
+        // Push small batches first
+        coalescer.push_batch(create_test_batch(20)).unwrap();
+        coalescer.push_batch(create_test_batch(20)).unwrap();
+        coalescer.push_batch(create_test_batch(30)).unwrap();
+
+        assert_eq!(coalescer.get_buffered_rows(), 70);
+        assert!(!coalescer.has_completed_batch());
+
+        // Push first large batch (700) - should coalesce due to buffered data
+        coalescer.push_batch(create_test_batch(700)).unwrap();
+
+        // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
+        assert_eq!(coalescer.get_buffered_rows(), 770);
+        assert!(!coalescer.has_completed_batch());
+
+        // Push second large batch (600) - should bypass since previous was 
large
+        coalescer.push_batch(create_test_batch(600)).unwrap();
+
+        // Should flush buffer (770 rows) and bypass the 600
+        let mut outputs = vec![];
+        while let Some(batch) = coalescer.next_completed_batch() {
+            outputs.push(batch);
+        }
+        assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one 
bypassed (600)
+        assert_eq!(outputs[0].num_rows(), 770);
+        assert_eq!(outputs[1].num_rows(), 600);
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+
+        // Push remaining large batches - should all bypass
+        let remaining_batches = [700, 900, 700, 600];
+        for &size in &remaining_batches {
+            coalescer.push_batch(create_test_batch(size)).unwrap();
+
+            assert!(coalescer.has_completed_batch());
+            let output = coalescer.next_completed_batch().unwrap();
+            assert_eq!(output.num_rows(), size);
+            assert_eq!(coalescer.get_buffered_rows(), 0);
+        }
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
+        // Test truly consecutive large batches that should all bypass
+        // This test ensures buffer is completely empty between large batches
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(200));
+
+        // Push consecutive large batches with no prior buffered data
+        let large_batches = vec![
+            create_test_batch(300),
+            create_test_batch(400),
+            create_test_batch(350),
+            create_test_batch(500),
+        ];
+
+        let mut all_outputs = vec![];
+
+        for (i, large_batch) in large_batches.into_iter().enumerate() {
+            let expected_size = large_batch.num_rows();
+
+            // Buffer should be empty before each large batch
+            assert_eq!(
+                coalescer.get_buffered_rows(),
+                0,
+                "Buffer should be empty before batch {}",
+                i
+            );
+
+            coalescer.push_batch(large_batch).unwrap();
+
+            // Each large batch should bypass and produce exactly one output 
batch
+            assert!(
+                coalescer.has_completed_batch(),
+                "Should have completed batch after pushing batch {}",
+                i
+            );
+
+            let output = coalescer.next_completed_batch().unwrap();
+            assert_eq!(
+                output.num_rows(),
+                expected_size,
+                "Batch {} should have bypassed with original size",
+                i
+            );
+
+            // Should be no more batches and buffer should be empty
+            assert!(
+                !coalescer.has_completed_batch(),
+                "Should have no more completed batches after batch {}",
+                i
+            );
+            assert_eq!(
+                coalescer.get_buffered_rows(),
+                0,
+                "Buffer should be empty after batch {}",
+                i
+            );
+
+            all_outputs.push(output);
+        }
+
+        // Verify we got exactly 4 output batches with original sizes
+        assert_eq!(all_outputs.len(), 4);
+        assert_eq!(all_outputs[0].num_rows(), 300);
+        assert_eq!(all_outputs[1].num_rows(), 400);
+        assert_eq!(all_outputs[2].num_rows(), 350);
+        assert_eq!(all_outputs[3].num_rows(), 500);
+    }
+
+    #[test]
+    fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
+        // Test that small batches reset the consecutive large batch tracking
+        let mut coalescer = BatchCoalescer::new(
+            Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, 
false)])),
+            100,
+        );
+        coalescer.set_biggest_coalesce_batch_size(Some(200));
+
+        // Push first large batch - should bypass (no buffered data)
+        coalescer.push_batch(create_test_batch(300)).unwrap();
+        let output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output.num_rows(), 300);
+
+        // Push second large batch - should bypass (consecutive)
+        coalescer.push_batch(create_test_batch(400)).unwrap();
+        let output = coalescer.next_completed_batch().unwrap();
+        assert_eq!(output.num_rows(), 400);
+
+        // Push small batch - resets consecutive tracking
+        coalescer.push_batch(create_test_batch(50)).unwrap();
+        assert_eq!(coalescer.get_buffered_rows(), 50);
+
+        // Push large batch again - should NOT bypass due to buffered data
+        coalescer.push_batch(create_test_batch(350)).unwrap();
+
+        // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
+        let mut outputs = vec![];
+        while let Some(batch) = coalescer.next_completed_batch() {
+            outputs.push(batch);
+        }
+        assert_eq!(outputs.len(), 4);
+        for batch in outputs {
+            assert_eq!(batch.num_rows(), 100);
+        }
+        assert_eq!(coalescer.get_buffered_rows(), 0);
+    }
 }


Reply via email to