andygrove opened a new issue, #1432:
URL: https://github.com/apache/datafusion-ballista/issues/1432

   ## Problem
   
   The current sort-based shuffle writer uses DataFusion's 
`BatchPartitioner::partition()` which calls `take_arrays()` to split each input 
batch into per-partition sub-batches. With `N` output partitions, a single 
input batch of 8192 rows produces up to `N` sub-batches averaging `8192/N` rows 
each. With 200 output partitions, that's ~41 rows per batch.
   
   These tiny batches are:
   - Appended individually to `PartitionBuffer`
   - Spilled individually to IPC files
   - Written individually to the final output file
   
   Each batch carries per-batch Arrow IPC overhead (metadata, alignment 
padding), and the large number of small batches hurts compression ratios and 
read performance.
   
   ## Proposed Solution
   
   Adopt the approach used by [Apache DataFusion 
Comet](https://github.com/apache/datafusion-comet/blob/main/native/core/src/execution/shuffle/shuffle_writer.rs)
 which avoids this problem using `arrow::compute::interleave_record_batch`:
   
   1. **Buffer input batches whole** — store unmodified input batches in a 
`Vec<RecordBatch>` instead of immediately splitting them per partition
   2. **Track indices, not data** — for each row, record `(batch_index, 
row_index)` into per-partition index vectors (`Vec<Vec<(u32, u32)>>`)
   3. **Deferred materialization** — when it's time to write (spill or final 
output), call `interleave_record_batch(&buffered_batches, &indices)` to gather 
rows into well-sized output batches (up to the configured `batch_size`)
   
   This produces properly-sized output batches regardless of the number of 
output partitions, with a single efficient gather operation instead of many 
small `take_arrays` calls.
   
   ### Comparison
   
   | Aspect | Current | Proposed |
   |--------|---------|----------|
   | When rows are partitioned | Immediately per input batch | Deferred — only 
indices tracked |
   | Output batch sizes | `input_rows / N` (tiny) | Up to `batch_size` 
(well-sized) |
   | Data copies | `take_arrays` per partition per batch | One `interleave` 
call at write time |
   
   ### Additional Consideration
   
   Comet also integrates with DataFusion's `MemoryReservation` / 
`MemoryConsumer` for memory-aware spilling instead of a fixed byte threshold. 
This could be adopted as a follow-up improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to