andygrove opened a new pull request, #3845: URL: https://github.com/apache/datafusion-comet/pull/3845
## Which issue does this PR close? N/A - experimental feature ## Rationale for this change **EXPERIMENTAL - not ready for merge** The current `MultiPartitionShuffleRepartitioner` buffers all input rows in memory before writing partitioned output during `shuffle_write`. This can cause high memory usage and frequent spilling for large datasets. `ImmediateModePartitioner` takes a different approach: it writes partitioned IPC blocks immediately as batches arrive, rather than deferring all writes to `shuffle_write`. This reduces peak memory usage at the cost of more frequent (but smaller) writes. ## What changes are included in this PR? ### New: `ImmediateModePartitioner` - **`PartitionOutputStream`**: Per-partition output stream that serializes Arrow IPC batches into an in-memory `Vec<u8>` buffer with compression. Block format matches `ShuffleBlockWriter` exactly for compatibility with existing readers. - **Batch coalescing**: Uses Arrow's `BatchCoalescer` to accumulate small sub-batches (from partition splitting) into `batch_size`-row blocks before IPC serialization, reducing per-block schema overhead. - **Spill support**: When the memory pool cannot grow, all partition buffers are flushed and spilled to per-partition temp files. On `shuffle_write`, spill file contents and remaining in-memory buffers are concatenated into the final data + index files. - **Memory accounting**: IPC buffer growth is tracked incrementally. Coalescer memory is reserved upfront based on schema-estimated row size × batch_size × num_partitions. ### Config: `spark.comet.exec.shuffle.partitionerMode` - `"immediate"` (default): Uses `ImmediateModePartitioner` - `"buffered"`: Uses the original `MultiPartitionShuffleRepartitioner` ### Benchmark: `--mode` flag on `shuffle_bench` - Allows comparing immediate vs buffered modes with the standalone benchmark tool ## How are these changes tested? 8 unit tests covering: - IPC round-trip with all compression codecs (None, Lz4, Zstd, Snappy) - Batch coalescing behavior (small batches merged before serialization) - Empty batch handling - Buffer drain and reuse after flush - Hash partitioning correctness - End-to-end shuffle_write (data + index file format validation) - Spill under memory pressure (256-byte `GreedyMemoryPool`) - Block format compatibility with existing `read_ipc_compressed` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
