andygrove opened a new pull request, #3754: URL: https://github.com/apache/datafusion-comet/pull/3754
## Which issue does this PR close? Performance optimization for native shuffle write. ## Rationale for this change The current shuffle write implementation buffers all input batches in memory, then for each output partition, uses `interleave_record_batch` to gather rows from across all buffered batches. This gather pattern has poor cache locality — with 12,000+ buffered batches spanning ~20 GiB of memory, each gather operation performs scattered random reads across the entire buffer. Benchmark results on TPC-H SF100 lineitem (100M rows, 200 partitions, LZ4): | Metric | Gather (before) | Scatter (after) | Improvement | |---|---|---|---| | Write time | 68.5s | 38.7s | **44% faster** | | Throughput | 288 MiB/s | 511 MiB/s | **1.8x** | | Read time | 0.448s | 0.444s | No regression | The improvement scales with data size — at 10M rows the gain is ~10%, at 100M rows it's ~44%, because the gather's cache pressure grows with the number of buffered batches. ## What changes are included in this PR? **Scatter kernel**: Instead of buffering all batches and gathering per-partition at write time, this PR scatters each input batch's column values directly into per-partition `PartitionBuffer`s as rows arrive. When a partition buffer reaches `batch_size` rows, it's flushed immediately. Key changes: - **`PartitionBuffer`** (new): Typed column buffers (`Fixed`, `Variable`, `LargeVariable`, `Boolean`, `Fallback`) that accumulate values with `append_*` methods and `flush()` to produce `RecordBatch` - **`scatter_batch()`**: Column-oriented scatter that iterates by partition for cache locality — each partition's buffer receives contiguous writes - **Removes `PartitionedBatchIterator`** and `interleave_record_batch` from the hot path - **`scatter_time` metric**: New metric tracking time spent in the scatter phase - **Standalone shuffle benchmark binary** (`shuffle_bench`): For profiling shuffle write/read performance outside of Spark ## How are these changes tested? - Existing shuffle writer tests (roundtrip, spill, memory) pass with the new scatter kernel - Standalone benchmark binary validates correctness via read-back verification - Benchmarked on TPC-H SF100 lineitem with multiple configurations (compression codecs, partition counts, batch sizes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
