jja725 opened a new issue, #1539:
URL: https://github.com/apache/datafusion-ballista/issues/1539

   ## Is your feature request related to a problem or challenge?
   
   Ballista currently stores shuffle data on local executor disks and serves it 
via Arrow Flight between executors. This creates several limitations:
   
   - **Fault tolerance**: If an executor crashes, its shuffle data is lost and 
the entire job must restart from the beginning.
   - **Resource coupling**: Compute (executor CPU/memory) and shuffle storage 
are tightly coupled. Executors must remain alive for the full duration of the 
job to serve shuffle data to downstream stages.
   - **Scalability**: With M map tasks and N reduce tasks, executors must hold 
shuffle data for all downstream consumers, which limits independent scaling of 
compute vs. shuffle storage.
   - **Elasticity**: Executors cannot be reclaimed or scaled down between 
stages because they are still serving shuffle data.
   
   ## Describe the solution you'd like
   
   Add support for an **external Remote Shuffle Service (RSS)** as a pluggable 
shuffle storage and serving layer. Instead of writing shuffle data to local 
executor disks and serving it via Flight, executors would push shuffle data to 
a dedicated external service, and reduce tasks would pull from it.
   
   Popular open-source RSS implementations that could serve as integration 
targets:
   
   - **[Apache Celeborn](https://celeborn.apache.org/)** (formerly Alibaba RSS) 
— push-based, supports Spark/Flink/MR
   - **[Apache Uniffle](https://uniffle.apache.org/)** (formerly Tencent RSS) — 
supports multiple compute engines
   
   ### High-level design ideas
   
   1. **Pluggable shuffle writer**: Introduce a `ShuffleWriter` 
trait/abstraction so the current local-disk writer and a new RSS writer can be 
swapped via configuration.
   2. **RSS push on map completion**: After a map stage task finishes, shuffle 
blocks are pushed to the RSS instead of being left on local disk.
   3. **RSS pull on reduce**: The shuffle reader fetches partitions from the 
RSS endpoint rather than connecting back to the producing executor via Flight.
   4. **Scheduler awareness**: The scheduler should not need to track partition 
locations per-executor; instead, it queries the RSS for partition locations.
   
   ### Benefits
   
   - **Better fault tolerance**: Shuffle data survives executor failures.
   - **Executor elasticity**: Executors can be released after map stages 
complete.
   - **Decoupled scaling**: Shuffle storage and compute can scale independently.
   - **Reduced network pressure on executors**: Dedicated shuffle servers can 
handle merging and serving more efficiently.
   
   ## Alternatives considered
   
   - **Issue #1151** proposes streaming push-based shuffle (in-memory, direct 
executor-to-executor). This is complementary but different — it targets 
low-latency queries where stages overlap, whereas RSS targets fault tolerance 
and resource elasticity for batch workloads.
   - Continuing to improve the existing local-disk + Flight approach (see 
#1319) is useful but does not address the fault tolerance or elasticity gaps.
   
   ## Additional context
   
   - Apache Spark has supported external shuffle service since Spark 1.2 and 
introduced push-based shuffle in Spark 3.2 (SPARK-30602).
   - Flink supports pluggable shuffle service via `ShuffleServiceFactory`.
   - Presto/Trino have ongoing work on native spooling shuffle.
   
   Is there community interest in this direction? Happy to help with design, 
prototyping, or integration work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to