andygrove opened a new pull request, #3754:
URL: https://github.com/apache/datafusion-comet/pull/3754

   ## Which issue does this PR close?
   
   Performance optimization for native shuffle write.
   
   ## Rationale for this change
   
   The current shuffle write implementation buffers all input batches in 
memory, then for each output partition, uses `interleave_record_batch` to 
gather rows from across all buffered batches. This gather pattern has poor 
cache locality — with 12,000+ buffered batches spanning ~20 GiB of memory, each 
gather operation performs scattered random reads across the entire buffer.
   
   Benchmark results on TPC-H SF100 lineitem (100M rows, 200 partitions, LZ4):
   
   | Metric | Gather (before) | Scatter (after) | Improvement |
   |---|---|---|---|
   | Write time | 68.5s | 38.7s | **44% faster** |
   | Throughput | 288 MiB/s | 511 MiB/s | **1.8x** |
   | Read time | 0.448s | 0.444s | No regression |
   
   The improvement scales with data size — at 10M rows the gain is ~10%, at 
100M rows it's ~44%, because the gather's cache pressure grows with the number 
of buffered batches.
   
   ## What changes are included in this PR?
   
   **Scatter kernel**: Instead of buffering all batches and gathering 
per-partition at write time, this PR scatters each input batch's column values 
directly into per-partition `PartitionBuffer`s as rows arrive. When a partition 
buffer reaches `batch_size` rows, it's flushed immediately.
   
   Key changes:
   - **`PartitionBuffer`** (new): Typed column buffers (`Fixed`, `Variable`, 
`LargeVariable`, `Boolean`, `Fallback`) that accumulate values with `append_*` 
methods and `flush()` to produce `RecordBatch`
   - **`scatter_batch()`**: Column-oriented scatter that iterates by partition 
for cache locality — each partition's buffer receives contiguous writes
   - **Removes `PartitionedBatchIterator`** and `interleave_record_batch` from 
the hot path
   - **`scatter_time` metric**: New metric tracking time spent in the scatter 
phase
   - **Standalone shuffle benchmark binary** (`shuffle_bench`): For profiling 
shuffle write/read performance outside of Spark
   
   ## How are these changes tested?
   
   - Existing shuffle writer tests (roundtrip, spill, memory) pass with the new 
scatter kernel
   - Standalone benchmark binary validates correctness via read-back 
verification
   - Benchmarked on TPC-H SF100 lineitem with multiple configurations 
(compression codecs, partition counts, batch sizes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to