andygrove opened a new pull request, #1389: URL: https://github.com/apache/datafusion-ballista/pull/1389
## Summary This PR adds an alternative shuffle implementation that writes a single consolidated file per input partition (sorted by output partition ID) along with an index file, similar to Apache Spark's sort-based shuffle. **This is a work-in-progress and not ready for merge.** ### Benefits - Reduces file count from `N × M` (N input partitions × M output partitions) to `2 × N` files - Memory buffering with configurable spill-to-disk support - Index file enables efficient seeking to specific partition data ### New Components - `SortShuffleWriterExec`: ExecutionPlan implementation for sort-based shuffle - `PartitionBuffer`: In-memory buffering per output partition - `SpillManager`: Handles spilling buffers to disk when memory pressure is high - `ShuffleIndex`: Index file I/O using little-endian i64 offsets - `SortShuffleConfig`: Configuration options ### New Configuration Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `ballista.shuffle.sort_based.enabled` | bool | false | Enable sort-based shuffle | | `ballista.shuffle.sort_based.buffer_size` | bytes | 1MB | Per-partition buffer size | | `ballista.shuffle.sort_based.memory_limit` | bytes | 256MB | Total memory limit for buffers | | `ballista.shuffle.sort_based.spill_threshold` | float | 0.8 | Spill when memory exceeds this fraction | ### References This implementation is inspired by: - **Apache DataFusion Comet's shuffle writer**: https://github.com/apache/datafusion-comet/blob/main/native/core/src/execution/shuffle/shuffle_writer.rs - **Apache Spark's sort-based shuffle**: https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations ## Work Remaining - [ ] Update `DistributedPlanner` to use `SortShuffleWriterExec` when enabled - [ ] Update `ShuffleReaderExec` to detect and read sort shuffle format - [ ] Add protobuf serialization for the new execution plan - [ ] Implement proper byte offset tracking in the index file - [ ] End-to-end integration tests - [ ] TPC-H benchmark comparison ## Test plan - [x] Unit tests for all new components (15 tests passing) - [x] `cargo build` passes - [x] `cargo test -p ballista-core` passes (43 tests) 🤖 Generated with [Claude Code](https://claude.ai/code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
