zhuqi-lucas commented on code in PR #8146: URL: https://github.com/apache/arrow-rs/pull/8146#discussion_r2280889409
########## arrow-select/src/coalesce.rs: ########## @@ -236,6 +262,13 @@ impl BatchCoalescer { /// assert_eq!(completed_batch, expected_batch); /// ``` pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { + if let Some(limit) = self.biggest_coalesce_batch_size { Review Comment: Current behaviour for this PR: ```rust // Large batch bypass optimization: // When biggest_coalesce_batch_size is configured and a batch exceeds this limit, // we can avoid expensive split-and-merge operations by passing it through directly. // // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)). // If not set (None), ALL batches follow normal coalescing behavior regardless of size. // ============================================================================= // CASE 1: No buffer + large batch → Direct bypass // ============================================================================= // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)): // Input sequence: [600, 1200, 300] // // With biggest_coalesce_batch_size=Some(500) (optimization enabled): // 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass // → output: [600] (bypass, preserves large batch) // 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass // → output: [1200] (bypass, preserves large batch) // 300 → normal batch, buffer: [300] // Result: [600], [1200], [300] - large batches preserved, mixed sizes // ============================================================================= // CASE 2: Buffer too large + large batch → Flush first, then bypass // ============================================================================= // This case prevents creating extremely large merged batches that would // significantly exceed both target_batch_size and biggest_coalesce_batch_size. // // Example 1: Buffer exceeds limit before large batch arrives // target_batch_size=1000, biggest_coalesce_batch_size=Some(400) // Input: [350, 200, 800] // // Step 1: push_batch([350]) // → batch_size=350 <= 400, normal path // → buffer: [350], buffered_rows=350 // // Step 2: push_batch([200]) // → batch_size=200 <= 400, normal path // → buffer: [350, 200], buffered_rows=550 // // Step 3: push_batch([800]) // → batch_size=800 > 400, large batch path // → buffered_rows=550 > 400 → Case 2: flush first // → flush: output [550] (combined [350, 200]) // → then bypass: output [800] // Result: [550], [800] - buffer flushed to prevent oversized merge // // Example 2: Multiple small batches accumulate before large batch // target_batch_size=1000, biggest_coalesce_batch_size=Some(300) // Input: [150, 100, 80, 900] // // Step 1-3: Accumulate small batches // 150 → buffer: [150], buffered_rows=150 // 100 → buffer: [150, 100], buffered_rows=250 // 80 → buffer: [150, 100, 80], buffered_rows=330 // // Step 4: push_batch([900]) // → batch_size=900 > 300, large batch path // → buffered_rows=330 > 300 → Case 2: flush first // → flush: output [330] (combined [150, 100, 80]) // → then bypass: output [900] // Result: [330], [900] - prevents merge into [1230] which would be too large // ============================================================================= // CASE 3: Small buffer + large batch → Normal coalescing (no bypass) // ============================================================================= // When buffer is small enough, we still merge to maintain efficiency // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500) // Input: [300, 1200] // // Step 1: push_batch([300]) // → batch_size=300 <= 500, normal path // → buffer: [300], buffered_rows=300 // // Step 2: push_batch([1200]) // → batch_size=1200 > 500, large batch path // → buffered_rows=300 <= 500 → Case 3: normal merge // → buffer: [300, 1200] (1500 total) // → 1500 > target_batch_size → split: output [1000], buffer [500] // Result: [1000], [500] - normal split/merge behavior maintained // ============================================================================= // Comparison: Default vs Optimized Behavior // ============================================================================= // target_batch_size=1000, biggest_coalesce_batch_size=Some(500) // Input: [600, 1200, 300] // // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None): // 600 → buffer: [600] // 1200 → buffer: [600, 1200] (1800 rows total) // → split: output [1000 rows], buffer [800 rows remaining] // 300 → buffer: [800, 300] (1100 rows total) // → split: output [1000 rows], buffer [100 rows remaining] // Result: [1000], [1000], [100] - all outputs respect target_batch_size // // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)): // 600 → Case 1: direct bypass → output: [600] // 1200 → Case 1: direct bypass → output: [1200] // 300 → normal path → buffer: [300] // Result: [600], [1200], [300] - large batches preserved // ============================================================================= // Benefits and Trade-offs // ============================================================================= // Benefits of the optimization: // - Large batches stay intact (better for downstream vectorized processing) // - Fewer split/merge operations (better CPU performance) // - More predictable memory usage patterns // - Maintains streaming efficiency while preserving batch boundaries // // Trade-offs: // - Output batch sizes become variable (not always target_batch_size) // - May produce smaller partial batches when flushing before large batches // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance // TODO, for unsorted batches, we may can filter all large batches, and coalesce all // small batches together? ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org