Wenchen,
Thanks for being supportive of the idea.
I think I missed addressing some parts of your first email.

*we can decrease the number of nodes that host active shuffle data, so that
> the cluster is more elastic.*


- We can write to local storage since we are using the Hadoop API.
- We could optimize task placement to run on hosts containing active
shuffle data.
- However, this poses a risk of disks filling up sooner( especially with
large shuffles), leading to task failures. This issue is amplified in
multi-user/session environments (e.g., Spark Connect) where unrelated jobs
might fill the disk, causing confusing diagnostic issues.
- *I propose offering this as an optional setting/config that knowledgeable
users can enable, particularly in environments with sufficient local
storage capacity.*

*push-based shuffle framework to support other deployments (standalone,
> k8s) and remote storage?*


- Since the existing framework is YARN-dependent, retrofitting it for other
resource managers would require extensive logic for handling resource
management and shuffle service deployment differences.
- I prefer keeping the consolidation stage separate for a cleaner, more
generalized design.
*- However, if a significant architectural overlap becomes apparent, then
integration should certainly be reconsidered.*


Hi Enrico,
Thanks for the ideas.
I think double writes(either as a direct write or a copy after local
write), would
-  Increases overhead and adds more failure points.
- Executors holding onto shuffle data would prevent aggressive cluster
downscaling.
*While we have slightly lower latency, I think the delta between local and
remote reads is small compared to overall job time.*
For small jobs, it would be beneficial to bypass the shuffle consolidation
stage dynamically at runtime to avoid unnecessary overhead.

* Is "shuffle consolidation" the preferred term?*


While the existing Spark term is "shuffle merge," which also involves
combining shuffle blocks, I am using "shuffle consolidation"  mainly for
disambiguation.
*- Shuffle merge *- combining few shuffle blocks
*- Shuffle consolidation* - merge all fragmented shuffle blocks for a given
reducer partition
*However, I don't have a strong opinion on which term is ultimately used,
as long as the function is clearly understood.*

Regards
Karuppayya


On Wed, Dec 3, 2025 at 2:37 AM Enrico Minack <[email protected]> wrote:

> Hi Karuppayya,
>
> Thanks for the clarification.
>
> I would like to emphasize that a solution would be great that allows to
> prefer shuffle data from executors over remote storage:
> If the shuffle consolidation*) stage merges mapper outputs into reducer
> inputs and stores those on a remote storage, it could easily keep a copy on
> the consolidating executors. For the lifetime of these executors, reducer
> executors could preferably fetch the consolidated shuffle data from the
> consolidating executors, with the obvious benefits. Only for decommissioned
> consolidation executors or consolidation executors on failed nodes, reducer
> executors fetch consolidated shuffle data from the remote storage.
>
> Further, splitting the proposed shuffle consolidation stage into:
> 1) consolidate shuffle data to executor local shuffle storage (AQE-based
> shuffle consolidation stage as proposed by Wenchen Fan)
> 2) replicate local shuffle storage to remote shuffle storage
> feels like a natural separation of concerns. And it allows for custom
> configuration to employ only the former without the latter or vice versa.
>
> Speaking of the ability to read shuffle data from executors during their
> existence while falling back to the remote storage replica reminds of the
> existing Fallback Storage logic. Feature 2) could be implemented by
> evolving existing infrastructure (Fallback Storage) with only hundred lines
> of code. The read-from-executors-first-then-remote-storage feature would
> cost a few more hundred lines of code.
>
> Cheers,
> Enrico
>
>
> *) Is "shuffle consolidation" the preferred term? Isn't the existing
> Spark term "shuffle merge" exactly what is being described here, maybe with
> some small extension?
>
>
> Am 01.12.25 um 20:30 schrieb karuppayya:
>
> Hi everyone,
> Thank you all for your valuable comments and discussion on the design
> document/this email. I have replied to the comments/concerns raised.
> I welcome any other questions and to be challenged further.
>
> Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
>
> If there are no other open questions by Wednesday morning (PST), I will
> request Chao to open the official voting thread (which should give 72
> hours for the process
> <https://spark.apache.org/improvement-proposals.html>).
>
> - Karuppayya
>
> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
> [email protected]> wrote:
>
>> One aspect that hasn’t been mentioned yet (or so I think) is the
>> thread-level behavior of shuffle. In large workloads with many small
>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of
>> threads tied to shuffle fetch operations, Netty client handlers, and block
>> file access.
>> Since the proposal changes block granularity and fetch patterns, it would
>> be valuable to explicitly consider how the consolidation stage affects:
>> – the number of concurrent fetch operations
>> – thread pool growth / saturation
>> – Netty transport threads
>> – memory pressure from large in-flight reads
>>
>> Btw, I find your proposal quite interesting.
>>
>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
>> escribió:
>>
>>> Rishab, Wenchen, Murali,
>>> Thank you very much for taking the time to review the proposal and for
>>> providing such thoughtful and insightful comments/questions.
>>>
>>> *Rishab*,
>>>
>>>> * suitable across all storage systems*
>>>
>>> You are right that the suitability is somewhat subjective and dependent
>>> on cloud provider used.
>>> In general, the goal of ShuffleVault is to utilize the standard Hadoop
>>> FileSystem APIs, which means it should work seamlessly with popular cloud
>>> and distributed file systems (like S3, HDFS, GFS, etc.).
>>> These systems share a similar nature and are designed for large files.
>>>
>>> *large file could create problems during retrieval.*
>>>
>>> We are mitigating this risk by ensuring that  tasks do not read the
>>> entire consolidated file at once.
>>> Instead, the implementation is designed to read the data in configured
>>> blocks, rather than relying on a single read. *This behavior can be
>>> refined/validated* to make it more robust.
>>>
>>> *Wenchen,*
>>> I fully agree that the operational details around using cloud storage
>>> for shuffle—specifically traffic throttling, cleanup guarantees
>>> and overall request-related network cost — these are critical issues
>>> that must be solved.
>>> The consolidation stage is explicitly designed to mitigate the
>>> throttling and accompanying cost issues .
>>> *Throttling* - By consolidating shuffle data, this approach transforms
>>> the read pattern from a multitude of small, random requests into fewer,
>>> large, targeted ones. Particularly beneficial for modern cloud object
>>> storage.
>>> *Shuffle cleanup* -  I am actively trying to leverage the Shufffle
>>> clean up mode and also making an effort to make them robust .These cleanup
>>> improvements should be beneficial, regardless of this proposal and cover
>>> most cases.
>>> However, I agree that to ensure *no orphaned files* remain, we will
>>> still require other means (such as remote storage lifecycle policies or
>>> job-specific scripts) for a guaranteed cleanup.
>>> Thank you again for your valuable feedback, especially the validation on
>>> synchronous scheduling and AQE integration.
>>>
>>> *Murali,*
>>>
>>>> * Doing an explicit sort stage*
>>>
>>> To clarify, ShuffleVault does not introduce an explicit sort stage.
>>> Instead, it introduces a Shuffle Consolidation Stage.
>>> This stage is a pure passthrough operation that only aggregates
>>> scattered shuffle data for a given reducer partition.
>>> In simple terms, it functions as an additional reducer stage that reads
>>> the fragmented shuffle files from the mappers and writes them as a single,
>>> consolidated, durable file to remote storage.
>>>
>>> *but that would be a nontrivial change *
>>>
>>> I agree that the change is significant, I am  actively working to ensure
>>> the benefits are leveraged across the stack. This PR
>>> <https://github.com/apache/spark/pull/53028> demonstrates integration
>>> with AQE and interactions with other rules(Exchange reuse, Shuffle
>>> Partition Coalescing ect).
>>> I would genuinely appreciate it if you could take a look at the POC PR
>>> to see the scope00 of changes. The primary logic is encapsulated within a
>>> new Spark Physical Planner Rule
>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
>>> that injects the consolidation stage, which is the main crux.
>>>
>>> I welcome any further questions or comments!
>>>
>>> Thanks & Regards
>>> Karuppayya
>>>
>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]>
>>> wrote:
>>>
>>>>
>>>> There are existing Apache projects which provide the capabilities which
>>>> largely addresses the problem statement - Apache Celeborn, Apache Uniffle,
>>>> Zeus, etc.
>>>> Doing an explicit sort stage, between "map" and "reduce" brings with it
>>>> some nice advantages, especially if the output is durable, but that would
>>>> be a nontrivial change - and should be attempted if the benefits are being
>>>> leveraged throughout the stack (AQE, speculative execution, etc)
>>>>
>>>> Regards,
>>>> Mridul
>>>>
>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Karuppayya,
>>>>>
>>>>> Handling large shuffles in Spark is challenging and it's great to see
>>>>> proposals addressing it. I think the extra "shuffle consolidation stage" 
>>>>> is
>>>>> a good idea, and now I feel it's better for it to be synchronous, so that
>>>>> we can integrate it with AQE and leverage the accurate runtime shuffle
>>>>> status to make decisions about whether or not to launch this extra 
>>>>> "shuffle
>>>>> consolidation stage" and how to consolidate. This is a key differentiator
>>>>> compared to the push-based shuffle.
>>>>>
>>>>> However, there are many details to consider, and in general it's
>>>>> difficult to use cloud storage for shuffle. We need to deal with problems
>>>>> like traffic throttling, cleanup guarantee, cost control, and so on. Let's
>>>>> take a step back and see what are the actual problems of large shuffles.
>>>>>
>>>>> Large shuffle usually starts with a large number of mappers that we
>>>>> can't adjust (e.g. large table scan). We can adjust the number of reducers
>>>>> to reach two goals:
>>>>> 1. The input data size of each reducer shouldn't be too large, which
>>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid
>>>>> spilling/OOM during reducer task execution.
>>>>> 2. The data size of each shuffle block shoudn't be too small, which is
>>>>> roughly *total_shuffle_size / (num_mappers * num_reducers)*. This is
>>>>> for the good of disk/network IO.
>>>>>
>>>>> These two goals are actually contradictory and sometimes we have to
>>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the
>>>>> query can finish. An extra "shuffle consolidation stage" can kind of
>>>>> decrease the number of mappers, by merging the shuffle files from multiple
>>>>> mappers. This can be a clear win as fetching many small shuffle blocks can
>>>>> be quite slow, even slower than running an extra "shuffle consolidation
>>>>> stage".
>>>>>
>>>>> In addition, the nodes that host shuffle files shouldn't be too many
>>>>> (best to be 0 which means shuffle files are stored in a different 
>>>>> storage).
>>>>> With a large number of mappers, likely every node in the cluster stores
>>>>> some shuffle files. By merging shuffle files via the extra "shuffle
>>>>> consolidation stage", we can decrease the number of nodes that host active
>>>>> shuffle data, so that the cluster is more elastic.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Wenchen
>>>>>
>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Karuppayya,
>>>>>>
>>>>>> Thanks for sharing the proposal and this looks very exciting.
>>>>>>
>>>>>> I have a few questions and please correct me if I misunderstood
>>>>>> anything.
>>>>>>
>>>>>> Would it be possible to clarify whether the consolidated shuffle file
>>>>>> produced for each partition is suitable across all storage systems,
>>>>>> especially when this file becomes extremely large? I am wondering if a 
>>>>>> very
>>>>>> large file could create problems during retrieval. For example, if a
>>>>>> connection breaks while reading the file, some storage systems may not
>>>>>> support resuming reads from the point of failure and start reading the 
>>>>>> file
>>>>>> from the beginning again. This could lead to higher latency, repeated
>>>>>> retries, or performance bottlenecks when a partition becomes too large or
>>>>>> skewed?
>>>>>>
>>>>>> Would it make sense to introduce a configurable upper-bound on the
>>>>>> maximum allowed file size? This might prevent the file from growing
>>>>>> massively.
>>>>>> Should the consolidated shuffle file be compressed before being
>>>>>> written to the storage system. Compression might introduce additional
>>>>>> latency but that too can be a configurable option.
>>>>>>
>>>>>> Regards,
>>>>>> Rishab Joshi
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Enrico,
>>>>>>> Thank you very much for reviewing the doc.
>>>>>>>
>>>>>>> *Since the consolidation stage reads all the shuffle data, why not
>>>>>>>> doing the transformation in that stage? What is the point in deferring 
>>>>>>>> the
>>>>>>>> transformations into another stage?*
>>>>>>>
>>>>>>>
>>>>>>> The reason for deferring the final consolidation to a subsequent
>>>>>>> stage lies in the distributed nature of shuffle data.
>>>>>>> Reducer requires reading all corresponding shuffle data written
>>>>>>> across all map tasks. Since each mapper only holds its own local output,
>>>>>>> the consolidation cannot begin until all the map stage completes.
>>>>>>>
>>>>>>> However, your question is also aligned to one of the approaches
>>>>>>> mentioned (concurrent consolidation
>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
>>>>>>> which was specifically considered.
>>>>>>> While the synchronous consolidation happens afetr all the data is
>>>>>>> available , concurrent consolidation can aggregate and persist the
>>>>>>> already-generated shuffle data to begin concurrently with the remaining 
>>>>>>> map
>>>>>>> tasks, thereby making the shuffle durable much earlier instead of 
>>>>>>> having to
>>>>>>> wait for all map tasks to complete.
>>>>>>>
>>>>>>> - Karuppayya
>>>>>>>
>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> another remark regarding a remote shuffle storage solution:
>>>>>>>> As long as the map executors are alive, reduce executors should
>>>>>>>> read from them to avoid any extra delay / overhead.
>>>>>>>> On fetch failure from a map executor, the reduce executors should
>>>>>>>> fall back to a remote storage that provides a copy (merged or not) of 
>>>>>>>> the
>>>>>>>> shuffle data.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Enrico
>>>>>>>>
>>>>>>>>
>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
>>>>>>>>
>>>>>>>> Hi Karuppayya,
>>>>>>>>
>>>>>>>> thanks for your proposal and bringing up this issue.
>>>>>>>>
>>>>>>>> I am very much in favour of a shuffle storage solution that allows
>>>>>>>> for dynamic allocation and node failure in a K8S environment, without 
>>>>>>>> the
>>>>>>>> burden of managing an Remote Shuffle Service.
>>>>>>>>
>>>>>>>> I have the following comments:
>>>>>>>>
>>>>>>>> Your proposed consolidation stage is equivalent to the next reducer
>>>>>>>> stage in the sense that it reads shuffle data from the earlier map 
>>>>>>>> stage.
>>>>>>>> This requires the executors of the map stage to survive until the 
>>>>>>>> shuffle
>>>>>>>> data are consolidated ("merged" in Spark terminology). Therefore, I 
>>>>>>>> think
>>>>>>>> this passage of your design document is not accurate:
>>>>>>>>
>>>>>>>>     Executors that perform the initial map tasks (shuffle writers)
>>>>>>>> can be immediately deallocated after writing their shuffle data ...
>>>>>>>>
>>>>>>>> Since the consolidation stage reads all the shuffle data, why not
>>>>>>>> doing the transformation in that stage? What is the point in deferring 
>>>>>>>> the
>>>>>>>> transformations into another stage?
>>>>>>>>
>>>>>>>> You mention the "Native Shuffle Block Migration" and say its
>>>>>>>> limitation is "It simply shifts the storage burden to other active
>>>>>>>> executors".
>>>>>>>> Please consider that the migration process can migrate to a (in
>>>>>>>> Spark called) fallback storage, which essentially copies the shuffle 
>>>>>>>> data
>>>>>>>> to a remote storage.
>>>>>>>> Kind regards,
>>>>>>>> Enrico
>>>>>>>>
>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
>>>>>>>>
>>>>>>>>  Hi All,
>>>>>>>>
>>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively
>>>>>>>> in Spark* .
>>>>>>>>
>>>>>>>> This approach would fundamentally decouple shuffle storage from
>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help
>>>>>>>> with aggressive downscaling*.
>>>>>>>>
>>>>>>>> The primary goal is to enhance the *elasticity and resilience* of
>>>>>>>> Spark workloads, leading to substantial cost optimization 
>>>>>>>> opportunities.
>>>>>>>>
>>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.*
>>>>>>>> *Looking forward to your feedback! *
>>>>>>>>
>>>>>>>> JIRA: SPARK-53484
>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
>>>>>>>> SPIP doc
>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
>>>>>>>> ,
>>>>>>>> Design doc
>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Karuppayya
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Regards
>>>>>> Rishab Joshi
>>>>>>
>>>>>
>

Reply via email to