andygrove opened a new pull request, #3441:
URL: https://github.com/apache/datafusion-comet/pull/3441

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax.
   -->
   
   Closes #.
   
   ## Rationale for this change
   
   In the multi-partition shuffle path, each small batch becomes its own IPC 
block with full schema metadata overhead (~500-2000 bytes). Small batches arise 
from:
   
   - **Spills under memory pressure**: 200 partitions × frequent spills → ~40 
rows per partition per spill event, each becoming a separate IPC block with 
full schema
   - **Trailing batches**: Iterator chunks into `batch_size` (8192), the last 
chunk is often much smaller
   - **Small partitions**: Skewed data or aggregation results may have very few 
rows per partition
   
   The single-partition path (`SinglePartitionShufflePartitioner`) already 
coalesces batches before writing. The multi-partition path does not.
   
   ## What changes are included in this PR?
   
   Adds batch coalescing directly to `BufBatchWriter`. Instead of serializing 
every batch immediately into an IPC block, batches accumulate in a 
`Vec<RecordBatch>` and get `concat_batches`'d when accumulated rows reach 
`batch_size`. This way both `shuffle_write_partition` and 
`PartitionWriter::spill` benefit automatically.
   
   `RecordBatch::clone()` is cheap (Arc reference counting only — no data 
copy), so accumulation has minimal overhead. The single-partition path already 
feeds large batches, so the coalescing is effectively a no-op there.
   
   ### Files changed
   
   | File | Change |
   |---|---|
   | `writers/buf_batch_writer.rs` | Add `pending_batches`, `pending_rows`, 
`batch_size` fields; coalescing in `write()`; `flush_pending()`; 
`write_batch_to_buffer()`; update `flush()` signature |
   | `writers/partition_writer.rs` | Add `batch_size` param to `spill()` |
   | `partitioners/multi_partition.rs` | Add `batch_size` param to 
`shuffle_write_partition()`; pass through at all call sites |
   | `partitioners/single_partition.rs` | Update `BufBatchWriter::new()` and 
`flush()` call signatures |
   | `shuffle_writer.rs` | Add coalescing size comparison test |
   
   ## How are these changes tested?
   
   New test `test_batch_coalescing_reduces_size` writes 100 small batches (50 
rows × 20 int32 columns) through `BufBatchWriter`:
   - With `batch_size=8192` (coalescing active) → fewer, larger IPC blocks
   - With `batch_size=1` (no coalescing) → many small IPC blocks
   - Asserts coalesced output is smaller
   - Verifies both roundtrip correctly via `read_ipc_compressed`
   
   All 12 existing shuffle tests continue to pass.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to