andygrove opened a new pull request, #3940:
URL: https://github.com/apache/datafusion-comet/pull/3940

   ## Which issue does this PR close?
   
   Related to #887 (memory over-reservation when running native shuffle write)
   
   ## Rationale for this change
   
   The current `MultiPartitionShuffleRepartitioner` (buffered mode) buffers all 
input batches in memory before writing partitioned output. While 
memory-efficient per-partition (no per-partition builders), it can consume 
significant memory when the total input data is large.
   
   The `ImmediateModePartitioner` from #3845 takes a different approach with 
per-partition Arrow builders, but as discussed in that PR, this causes memory 
to scale linearly with the number of output partitions — problematic for the 
common case of 1000+ partitions.
   
   This PR introduces a third approach: **sort-based repartitioning**. For each 
input batch, it:
   1. Computes partition IDs via hash (same as buffered mode)
   2. Counting-sorts row indices by partition ID — O(n) since partition IDs are 
integers in [0, N)
   3. Uses Arrow `take` to reorder the batch by partition
   4. Slices the sorted batch at partition boundaries (zero-copy)
   5. Writes each slice to per-partition spill files via persistent 
`BufBatchWriter`s with `BatchCoalescer`
   
   This avoids both buffering all input (like buffered mode) and per-partition 
Arrow builders (like immediate mode).
   
   ## Benchmark Results (macOS M3 Ultra)
   
   **200 partitions, 4 GB memory limit, 100M rows, lz4:**
   
   | Mode | Throughput | Peak RSS |
   |------|-----------|----------|
   | buffered | 2.17 M rows/s | 6.8 GiB |
   | sort | 2.95 M rows/s | 4.0 GiB |
   
   **800 partitions, 4 GB memory limit, 100M rows, lz4:**
   
   | Mode | Throughput | Peak RSS |
   |------|-----------|----------|
   | buffered | 1.99 M rows/s | 7.4 GiB |
   | sort | 2.09 M rows/s | 22.2 GiB |
   
   At moderate partition counts (200), sort-based is 36% faster with 41% less 
memory. At 800 partitions, the per-partition spill files and write buffers 
cause higher memory usage. This trade-off should be documented in the tuning 
guide.
   
   ## What changes are included in this PR?
   
   - New `SortBasedPartitioner` implementing `ShufflePartitioner` trait
   - `PartitionSpillWriter` with persistent `BufBatchWriter` per partition for 
batch coalescing
   - `sort_based: bool` parameter on `ShuffleWriterExec`
   - Config: `spark.comet.exec.shuffle.sort_based` (default: false)
   - Protobuf field in `ShuffleWriter` message
   - Benchmark support: `--mode sort` in `shuffle_bench`
   - `spill_batch` method on `PartitionWriter` (for potential future use)
   
   ## How are these changes tested?
   
   - 4 new Rust unit tests (`test_sort_based_basic`, 
`test_sort_based_insert_larger_batch`, `test_sort_based_insert_smaller_batch`, 
`test_sort_based_large_number_of_partitions`) covering Hash, Range, and 
RoundRobin partitioning
   - All 23 shuffle tests pass
   - Benchmarked with TPC-H SF100 lineitem data at 200 and 800 partitions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to