andygrove opened a new pull request, #3845:
URL: https://github.com/apache/datafusion-comet/pull/3845

   ## Which issue does this PR close?
   
   N/A - experimental feature
   
   ## Rationale for this change
   
   **EXPERIMENTAL - not ready for merge**
   
   The current `MultiPartitionShuffleRepartitioner` buffers all input rows in 
memory before writing partitioned output during `shuffle_write`. This can cause 
high memory usage and frequent spilling for large datasets.
   
   `ImmediateModePartitioner` takes a different approach: it writes partitioned 
IPC blocks immediately as batches arrive, rather than deferring all writes to 
`shuffle_write`. This reduces peak memory usage at the cost of more frequent 
(but smaller) writes.
   
   ## What changes are included in this PR?
   
   ### New: `ImmediateModePartitioner`
   - **`PartitionOutputStream`**: Per-partition output stream that serializes 
Arrow IPC batches into an in-memory `Vec<u8>` buffer with compression. Block 
format matches `ShuffleBlockWriter` exactly for compatibility with existing 
readers.
   - **Batch coalescing**: Uses Arrow's `BatchCoalescer` to accumulate small 
sub-batches (from partition splitting) into `batch_size`-row blocks before IPC 
serialization, reducing per-block schema overhead.
   - **Spill support**: When the memory pool cannot grow, all partition buffers 
are flushed and spilled to per-partition temp files. On `shuffle_write`, spill 
file contents and remaining in-memory buffers are concatenated into the final 
data + index files.
   - **Memory accounting**: IPC buffer growth is tracked incrementally. 
Coalescer memory is reserved upfront based on schema-estimated row size × 
batch_size × num_partitions.
   
   ### Config: `spark.comet.exec.shuffle.partitionerMode`
   - `"immediate"` (default): Uses `ImmediateModePartitioner`
   - `"buffered"`: Uses the original `MultiPartitionShuffleRepartitioner`
   
   ### Benchmark: `--mode` flag on `shuffle_bench`
   - Allows comparing immediate vs buffered modes with the standalone benchmark 
tool
   
   ## How are these changes tested?
   
   8 unit tests covering:
   - IPC round-trip with all compression codecs (None, Lz4, Zstd, Snappy)
   - Batch coalescing behavior (small batches merged before serialization)
   - Empty batch handling
   - Buffer drain and reuse after flush
   - Hash partitioning correctness
   - End-to-end shuffle_write (data + index file format validation)
   - Spill under memory pressure (256-byte `GreedyMemoryPool`)
   - Block format compatibility with existing `read_ipc_compressed`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to