andygrove opened a new pull request, #3940: URL: https://github.com/apache/datafusion-comet/pull/3940
## Which issue does this PR close? Related to #887 (memory over-reservation when running native shuffle write) ## Rationale for this change The current `MultiPartitionShuffleRepartitioner` (buffered mode) buffers all input batches in memory before writing partitioned output. While memory-efficient per-partition (no per-partition builders), it can consume significant memory when the total input data is large. The `ImmediateModePartitioner` from #3845 takes a different approach with per-partition Arrow builders, but as discussed in that PR, this causes memory to scale linearly with the number of output partitions — problematic for the common case of 1000+ partitions. This PR introduces a third approach: **sort-based repartitioning**. For each input batch, it: 1. Computes partition IDs via hash (same as buffered mode) 2. Counting-sorts row indices by partition ID — O(n) since partition IDs are integers in [0, N) 3. Uses Arrow `take` to reorder the batch by partition 4. Slices the sorted batch at partition boundaries (zero-copy) 5. Writes each slice to per-partition spill files via persistent `BufBatchWriter`s with `BatchCoalescer` This avoids both buffering all input (like buffered mode) and per-partition Arrow builders (like immediate mode). ## Benchmark Results (macOS M3 Ultra) **200 partitions, 4 GB memory limit, 100M rows, lz4:** | Mode | Throughput | Peak RSS | |------|-----------|----------| | buffered | 2.17 M rows/s | 6.8 GiB | | sort | 2.95 M rows/s | 4.0 GiB | **800 partitions, 4 GB memory limit, 100M rows, lz4:** | Mode | Throughput | Peak RSS | |------|-----------|----------| | buffered | 1.99 M rows/s | 7.4 GiB | | sort | 2.09 M rows/s | 22.2 GiB | At moderate partition counts (200), sort-based is 36% faster with 41% less memory. At 800 partitions, the per-partition spill files and write buffers cause higher memory usage. This trade-off should be documented in the tuning guide. ## What changes are included in this PR? - New `SortBasedPartitioner` implementing `ShufflePartitioner` trait - `PartitionSpillWriter` with persistent `BufBatchWriter` per partition for batch coalescing - `sort_based: bool` parameter on `ShuffleWriterExec` - Config: `spark.comet.exec.shuffle.sort_based` (default: false) - Protobuf field in `ShuffleWriter` message - Benchmark support: `--mode sort` in `shuffle_bench` - `spill_batch` method on `PartitionWriter` (for potential future use) ## How are these changes tested? - 4 new Rust unit tests (`test_sort_based_basic`, `test_sort_based_insert_larger_batch`, `test_sort_based_insert_smaller_batch`, `test_sort_based_large_number_of_partitions`) covering Hash, Range, and RoundRobin partitioning - All 23 shuffle tests pass - Benchmarked with TPC-H SF100 lineitem data at 200 and 800 partitions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
