zhuqi-lucas commented on code in PR #8146:
URL: https://github.com/apache/arrow-rs/pull/8146#discussion_r2280889409


##########
arrow-select/src/coalesce.rs:
##########
@@ -236,6 +262,13 @@ impl BatchCoalescer {
     /// assert_eq!(completed_batch, expected_batch);
     /// ```
     pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> 
{
+        if let Some(limit) = self.biggest_coalesce_batch_size {

Review Comment:
   Current behaviour for this PR:
   
   ```rust
           // Large batch bypass optimization:
           // When biggest_coalesce_batch_size is configured and a batch 
exceeds this limit,
           // we can avoid expensive split-and-merge operations by passing it 
through directly.
           //
           // IMPORTANT: This optimization is OPTIONAL and only active when 
biggest_coalesce_batch_size
           // is explicitly set via 
with_biggest_coalesce_batch_size(Some(limit)).
           // If not set (None), ALL batches follow normal coalescing behavior 
regardless of size.
   
           // 
=============================================================================
           // CASE 1: No buffer + large batch → Direct bypass
           // 
=============================================================================
           // Example scenario (target_batch_size=1000, 
biggest_coalesce_batch_size=Some(500)):
           // Input sequence: [600, 1200, 300]
           //
           // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
           //   600 → large batch detected! buffered_rows=0 → Case 1: direct 
bypass
           //        → output: [600] (bypass, preserves large batch)
           //   1200 → large batch detected! buffered_rows=0 → Case 1: direct 
bypass
           //         → output: [1200] (bypass, preserves large batch)
           //   300 → normal batch, buffer: [300]
           //   Result: [600], [1200], [300] - large batches preserved, mixed 
sizes
   
           // 
=============================================================================
           // CASE 2: Buffer too large + large batch → Flush first, then bypass
           // 
=============================================================================
           // This case prevents creating extremely large merged batches that 
would
           // significantly exceed both target_batch_size and 
biggest_coalesce_batch_size.
           //
           // Example 1: Buffer exceeds limit before large batch arrives
           // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
           // Input: [350, 200, 800]
           //
           // Step 1: push_batch([350])
           //   → batch_size=350 <= 400, normal path
           //   → buffer: [350], buffered_rows=350
           //
           // Step 2: push_batch([200])
           //   → batch_size=200 <= 400, normal path
           //   → buffer: [350, 200], buffered_rows=550
           //
           // Step 3: push_batch([800])
           //   → batch_size=800 > 400, large batch path
           //   → buffered_rows=550 > 400 → Case 2: flush first
           //   → flush: output [550] (combined [350, 200])
           //   → then bypass: output [800]
           //   Result: [550], [800] - buffer flushed to prevent oversized merge
           //
           // Example 2: Multiple small batches accumulate before large batch
           // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
           // Input: [150, 100, 80, 900]
           //
           // Step 1-3: Accumulate small batches
           //   150 → buffer: [150], buffered_rows=150
           //   100 → buffer: [150, 100], buffered_rows=250
           //   80  → buffer: [150, 100, 80], buffered_rows=330
           //
           // Step 4: push_batch([900])
           //   → batch_size=900 > 300, large batch path
           //   → buffered_rows=330 > 300 → Case 2: flush first
           //   → flush: output [330] (combined [150, 100, 80])
           //   → then bypass: output [900]
           //   Result: [330], [900] - prevents merge into [1230] which would 
be too large
   
           // 
=============================================================================
           // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
           // 
=============================================================================
           // When buffer is small enough, we still merge to maintain efficiency
           // Example: target_batch_size=1000, 
biggest_coalesce_batch_size=Some(500)
           // Input: [300, 1200]
           //
           // Step 1: push_batch([300])
           //   → batch_size=300 <= 500, normal path
           //   → buffer: [300], buffered_rows=300
           //
           // Step 2: push_batch([1200])
           //   → batch_size=1200 > 500, large batch path
           //   → buffered_rows=300 <= 500 → Case 3: normal merge
           //   → buffer: [300, 1200] (1500 total)
           //   → 1500 > target_batch_size → split: output [1000], buffer [500]
           //   Result: [1000], [500] - normal split/merge behavior maintained
   
           // 
=============================================================================
           // Comparison: Default vs Optimized Behavior
           // 
=============================================================================
           // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
           // Input: [600, 1200, 300]
           //
           // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
           //   600 → buffer: [600]
           //   1200 → buffer: [600, 1200] (1800 rows total)
           //         → split: output [1000 rows], buffer [800 rows remaining]
           //   300 → buffer: [800, 300] (1100 rows total)
           //        → split: output [1000 rows], buffer [100 rows remaining]
           //   Result: [1000], [1000], [100] - all outputs respect 
target_batch_size
           //
           // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
           //   600 → Case 1: direct bypass → output: [600]
           //   1200 → Case 1: direct bypass → output: [1200]
           //   300 → normal path → buffer: [300]
           //   Result: [600], [1200], [300] - large batches preserved
   
           // 
=============================================================================
           // Benefits and Trade-offs
           // 
=============================================================================
           // Benefits of the optimization:
           // - Large batches stay intact (better for downstream vectorized 
processing)
           // - Fewer split/merge operations (better CPU performance)
           // - More predictable memory usage patterns
           // - Maintains streaming efficiency while preserving batch boundaries
           //
           // Trade-offs:
           // - Output batch sizes become variable (not always 
target_batch_size)
           // - May produce smaller partial batches when flushing before large 
batches
           // - Requires tuning biggest_coalesce_batch_size parameter for 
optimal performance
   
           // TODO, for unsorted batches, we may can filter all large batches, 
and coalesce all
           // small batches together?
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to