andygrove opened a new issue, #1432: URL: https://github.com/apache/datafusion-ballista/issues/1432
## Problem The current sort-based shuffle writer uses DataFusion's `BatchPartitioner::partition()` which calls `take_arrays()` to split each input batch into per-partition sub-batches. With `N` output partitions, a single input batch of 8192 rows produces up to `N` sub-batches averaging `8192/N` rows each. With 200 output partitions, that's ~41 rows per batch. These tiny batches are: - Appended individually to `PartitionBuffer` - Spilled individually to IPC files - Written individually to the final output file Each batch carries per-batch Arrow IPC overhead (metadata, alignment padding), and the large number of small batches hurts compression ratios and read performance. ## Proposed Solution Adopt the approach used by [Apache DataFusion Comet](https://github.com/apache/datafusion-comet/blob/main/native/core/src/execution/shuffle/shuffle_writer.rs) which avoids this problem using `arrow::compute::interleave_record_batch`: 1. **Buffer input batches whole** — store unmodified input batches in a `Vec<RecordBatch>` instead of immediately splitting them per partition 2. **Track indices, not data** — for each row, record `(batch_index, row_index)` into per-partition index vectors (`Vec<Vec<(u32, u32)>>`) 3. **Deferred materialization** — when it's time to write (spill or final output), call `interleave_record_batch(&buffered_batches, &indices)` to gather rows into well-sized output batches (up to the configured `batch_size`) This produces properly-sized output batches regardless of the number of output partitions, with a single efficient gather operation instead of many small `take_arrays` calls. ### Comparison | Aspect | Current | Proposed | |--------|---------|----------| | When rows are partitioned | Immediately per input batch | Deferred — only indices tracked | | Output batch sizes | `input_rows / N` (tiny) | Up to `batch_size` (well-sized) | | Data copies | `take_arrays` per partition per batch | One `interleave` call at write time | ### Additional Consideration Comet also integrates with DataFusion's `MemoryReservation` / `MemoryConsumer` for memory-aware spilling instead of a fixed byte threshold. This could be adopted as a follow-up improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
