I am actually not in favor of this proposal, given there are existing
solutions - including various Apache projects - which handle this:
including reading/writing to cloud storage: without needing a change to how
Spark itself operates.
The benefits, compared to existing approaches, is not clear.


Regards,
Mridul

On Mon, Dec 1, 2025 at 1:47 PM Aaron Dantley <[email protected]> wrote:

> Please remove me from this distribution list.
>
> Thanks
>
> On Mon, Dec 1, 2025 at 2:33 PM karuppayya <[email protected]>
> wrote:
>
>> Hi everyone,
>> Thank you all for your valuable comments and discussion on the design
>> document/this email. I have replied to the comments/concerns raised.
>> I welcome any other questions and to be challenged further.
>>
>> Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
>>
>> If there are no other open questions by Wednesday morning (PST), I will
>> request Chao to open the official voting thread (which should give 72
>> hours for the process
>> <https://spark.apache.org/improvement-proposals.html>).
>>
>> - Karuppayya
>>
>> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
>> [email protected]> wrote:
>>
>>> One aspect that hasn’t been mentioned yet (or so I think) is the
>>> thread-level behavior of shuffle. In large workloads with many small
>>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of
>>> threads tied to shuffle fetch operations, Netty client handlers, and block
>>> file access.
>>> Since the proposal changes block granularity and fetch patterns, it
>>> would be valuable to explicitly consider how the consolidation stage
>>> affects:
>>> – the number of concurrent fetch operations
>>> – thread pool growth / saturation
>>> – Netty transport threads
>>> – memory pressure from large in-flight reads
>>>
>>> Btw, I find your proposal quite interesting.
>>>
>>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
>>> escribió:
>>>
>>>> Rishab, Wenchen, Murali,
>>>> Thank you very much for taking the time to review the proposal and for
>>>> providing such thoughtful and insightful comments/questions.
>>>>
>>>> *Rishab*,
>>>>
>>>>> * suitable across all storage systems*
>>>>
>>>> You are right that the suitability is somewhat subjective and dependent
>>>> on cloud provider used.
>>>> In general, the goal of ShuffleVault is to utilize the standard Hadoop
>>>> FileSystem APIs, which means it should work seamlessly with popular cloud
>>>> and distributed file systems (like S3, HDFS, GFS, etc.).
>>>> These systems share a similar nature and are designed for large files.
>>>>
>>>> *large file could create problems during retrieval.*
>>>>
>>>> We are mitigating this risk by ensuring that  tasks do not read the
>>>> entire consolidated file at once.
>>>> Instead, the implementation is designed to read the data in configured
>>>> blocks, rather than relying on a single read. *This behavior can be
>>>> refined/validated* to make it more robust.
>>>>
>>>> *Wenchen,*
>>>> I fully agree that the operational details around using cloud storage
>>>> for shuffle—specifically traffic throttling, cleanup guarantees
>>>> and overall request-related network cost — these are critical issues
>>>> that must be solved.
>>>> The consolidation stage is explicitly designed to mitigate the
>>>> throttling and accompanying cost issues .
>>>> *Throttling* - By consolidating shuffle data, this approach transforms
>>>> the read pattern from a multitude of small, random requests into fewer,
>>>> large, targeted ones. Particularly beneficial for modern cloud object
>>>> storage.
>>>> *Shuffle cleanup* -  I am actively trying to leverage the Shufffle
>>>> clean up mode and also making an effort to make them robust .These cleanup
>>>> improvements should be beneficial, regardless of this proposal and cover
>>>> most cases.
>>>> However, I agree that to ensure *no orphaned files* remain, we will
>>>> still require other means (such as remote storage lifecycle policies or
>>>> job-specific scripts) for a guaranteed cleanup.
>>>> Thank you again for your valuable feedback, especially the validation
>>>> on synchronous scheduling and AQE integration.
>>>>
>>>> *Murali,*
>>>>
>>>>> * Doing an explicit sort stage*
>>>>
>>>> To clarify, ShuffleVault does not introduce an explicit sort stage.
>>>> Instead, it introduces a Shuffle Consolidation Stage.
>>>> This stage is a pure passthrough operation that only aggregates
>>>> scattered shuffle data for a given reducer partition.
>>>> In simple terms, it functions as an additional reducer stage that reads
>>>> the fragmented shuffle files from the mappers and writes them as a single,
>>>> consolidated, durable file to remote storage.
>>>>
>>>> *but that would be a nontrivial change *
>>>>
>>>> I agree that the change is significant, I am  actively working to
>>>> ensure the benefits are leveraged across the stack. This PR
>>>> <https://github.com/apache/spark/pull/53028> demonstrates integration
>>>> with AQE and interactions with other rules(Exchange reuse, Shuffle
>>>> Partition Coalescing ect).
>>>> I would genuinely appreciate it if you could take a look at the POC PR
>>>> to see the scope00 of changes. The primary logic is encapsulated within a
>>>> new Spark Physical Planner Rule
>>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
>>>> that injects the consolidation stage, which is the main crux.
>>>>
>>>> I welcome any further questions or comments!
>>>>
>>>> Thanks & Regards
>>>> Karuppayya
>>>>
>>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> There are existing Apache projects which provide the capabilities
>>>>> which largely addresses the problem statement - Apache Celeborn, Apache
>>>>> Uniffle, Zeus, etc.
>>>>> Doing an explicit sort stage, between "map" and "reduce" brings with
>>>>> it some nice advantages, especially if the output is durable, but that
>>>>> would be a nontrivial change - and should be attempted if the benefits are
>>>>> being leveraged throughout the stack (AQE, speculative execution, etc)
>>>>>
>>>>> Regards,
>>>>> Mridul
>>>>>
>>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Karuppayya,
>>>>>>
>>>>>> Handling large shuffles in Spark is challenging and it's great to see
>>>>>> proposals addressing it. I think the extra "shuffle consolidation stage" 
>>>>>> is
>>>>>> a good idea, and now I feel it's better for it to be synchronous, so that
>>>>>> we can integrate it with AQE and leverage the accurate runtime shuffle
>>>>>> status to make decisions about whether or not to launch this extra 
>>>>>> "shuffle
>>>>>> consolidation stage" and how to consolidate. This is a key differentiator
>>>>>> compared to the push-based shuffle.
>>>>>>
>>>>>> However, there are many details to consider, and in general it's
>>>>>> difficult to use cloud storage for shuffle. We need to deal with problems
>>>>>> like traffic throttling, cleanup guarantee, cost control, and so on. 
>>>>>> Let's
>>>>>> take a step back and see what are the actual problems of large shuffles.
>>>>>>
>>>>>> Large shuffle usually starts with a large number of mappers that we
>>>>>> can't adjust (e.g. large table scan). We can adjust the number of 
>>>>>> reducers
>>>>>> to reach two goals:
>>>>>> 1. The input data size of each reducer shouldn't be too large, which
>>>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid
>>>>>> spilling/OOM during reducer task execution.
>>>>>> 2. The data size of each shuffle block shoudn't be too small, which
>>>>>> is roughly *total_shuffle_size / (num_mappers * num_reducers)*. This
>>>>>> is for the good of disk/network IO.
>>>>>>
>>>>>> These two goals are actually contradictory and sometimes we have to
>>>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the
>>>>>> query can finish. An extra "shuffle consolidation stage" can kind of
>>>>>> decrease the number of mappers, by merging the shuffle files from 
>>>>>> multiple
>>>>>> mappers. This can be a clear win as fetching many small shuffle blocks 
>>>>>> can
>>>>>> be quite slow, even slower than running an extra "shuffle consolidation
>>>>>> stage".
>>>>>>
>>>>>> In addition, the nodes that host shuffle files shouldn't be too many
>>>>>> (best to be 0 which means shuffle files are stored in a different 
>>>>>> storage).
>>>>>> With a large number of mappers, likely every node in the cluster stores
>>>>>> some shuffle files. By merging shuffle files via the extra "shuffle
>>>>>> consolidation stage", we can decrease the number of nodes that host 
>>>>>> active
>>>>>> shuffle data, so that the cluster is more elastic.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Wenchen
>>>>>>
>>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karuppayya,
>>>>>>>
>>>>>>> Thanks for sharing the proposal and this looks very exciting.
>>>>>>>
>>>>>>> I have a few questions and please correct me if I misunderstood
>>>>>>> anything.
>>>>>>>
>>>>>>> Would it be possible to clarify whether the consolidated shuffle
>>>>>>> file produced for each partition is suitable across all storage systems,
>>>>>>> especially when this file becomes extremely large? I am wondering if a 
>>>>>>> very
>>>>>>> large file could create problems during retrieval. For example, if a
>>>>>>> connection breaks while reading the file, some storage systems may not
>>>>>>> support resuming reads from the point of failure and start reading the 
>>>>>>> file
>>>>>>> from the beginning again. This could lead to higher latency, repeated
>>>>>>> retries, or performance bottlenecks when a partition becomes too large 
>>>>>>> or
>>>>>>> skewed?
>>>>>>>
>>>>>>> Would it make sense to introduce a configurable upper-bound on the
>>>>>>> maximum allowed file size? This might prevent the file from growing
>>>>>>> massively.
>>>>>>> Should the consolidated shuffle file be compressed before being
>>>>>>> written to the storage system. Compression might introduce additional
>>>>>>> latency but that too can be a configurable option.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Rishab Joshi
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Enrico,
>>>>>>>> Thank you very much for reviewing the doc.
>>>>>>>>
>>>>>>>> *Since the consolidation stage reads all the shuffle data, why not
>>>>>>>>> doing the transformation in that stage? What is the point in 
>>>>>>>>> deferring the
>>>>>>>>> transformations into another stage?*
>>>>>>>>
>>>>>>>>
>>>>>>>> The reason for deferring the final consolidation to a subsequent
>>>>>>>> stage lies in the distributed nature of shuffle data.
>>>>>>>> Reducer requires reading all corresponding shuffle data written
>>>>>>>> across all map tasks. Since each mapper only holds its own local 
>>>>>>>> output,
>>>>>>>> the consolidation cannot begin until all the map stage completes.
>>>>>>>>
>>>>>>>> However, your question is also aligned to one of the approaches
>>>>>>>> mentioned (concurrent consolidation
>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
>>>>>>>> which was specifically considered.
>>>>>>>>
>>>>>>>> While the synchronous consolidation happens afetr all the data is
>>>>>>>> available , concurrent consolidation can aggregate and persist the
>>>>>>>> already-generated shuffle data to begin concurrently with the 
>>>>>>>> remaining map
>>>>>>>> tasks, thereby making the shuffle durable much earlier instead of 
>>>>>>>> having to
>>>>>>>> wait for all map tasks to complete.
>>>>>>>>
>>>>>>>> - Karuppayya
>>>>>>>>
>>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> another remark regarding a remote shuffle storage solution:
>>>>>>>>> As long as the map executors are alive, reduce executors should
>>>>>>>>> read from them to avoid any extra delay / overhead.
>>>>>>>>> On fetch failure from a map executor, the reduce executors should
>>>>>>>>> fall back to a remote storage that provides a copy (merged or not) of 
>>>>>>>>> the
>>>>>>>>> shuffle data.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Enrico
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
>>>>>>>>>
>>>>>>>>> Hi Karuppayya,
>>>>>>>>>
>>>>>>>>> thanks for your proposal and bringing up this issue.
>>>>>>>>>
>>>>>>>>> I am very much in favour of a shuffle storage solution that allows
>>>>>>>>> for dynamic allocation and node failure in a K8S environment, without 
>>>>>>>>> the
>>>>>>>>> burden of managing an Remote Shuffle Service.
>>>>>>>>>
>>>>>>>>> I have the following comments:
>>>>>>>>>
>>>>>>>>> Your proposed consolidation stage is equivalent to the next
>>>>>>>>> reducer stage in the sense that it reads shuffle data from the 
>>>>>>>>> earlier map
>>>>>>>>> stage. This requires the executors of the map stage to survive until 
>>>>>>>>> the
>>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). 
>>>>>>>>> Therefore, I
>>>>>>>>> think this passage of your design document is not accurate:
>>>>>>>>>
>>>>>>>>>     Executors that perform the initial map tasks (shuffle writers)
>>>>>>>>> can be immediately deallocated after writing their shuffle data ...
>>>>>>>>>
>>>>>>>>> Since the consolidation stage reads all the shuffle data, why not
>>>>>>>>> doing the transformation in that stage? What is the point in 
>>>>>>>>> deferring the
>>>>>>>>> transformations into another stage?
>>>>>>>>>
>>>>>>>>> You mention the "Native Shuffle Block Migration" and say its
>>>>>>>>> limitation is "It simply shifts the storage burden to other active
>>>>>>>>> executors".
>>>>>>>>> Please consider that the migration process can migrate to a (in
>>>>>>>>> Spark called) fallback storage, which essentially copies the shuffle 
>>>>>>>>> data
>>>>>>>>> to a remote storage.
>>>>>>>>> Kind regards,
>>>>>>>>> Enrico
>>>>>>>>>
>>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
>>>>>>>>>
>>>>>>>>>  Hi All,
>>>>>>>>>
>>>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively
>>>>>>>>> in Spark* .
>>>>>>>>>
>>>>>>>>> This approach would fundamentally decouple shuffle storage from
>>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help
>>>>>>>>> with aggressive downscaling*.
>>>>>>>>>
>>>>>>>>> The primary goal is to enhance the *elasticity and resilience* of
>>>>>>>>> Spark workloads, leading to substantial cost optimization 
>>>>>>>>> opportunities.
>>>>>>>>>
>>>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.*
>>>>>>>>> *Looking forward to your feedback! *
>>>>>>>>>
>>>>>>>>> JIRA: SPARK-53484
>>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
>>>>>>>>> SPIP doc
>>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
>>>>>>>>> ,
>>>>>>>>> Design doc
>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
>>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Karuppayya
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Regards
>>>>>>> Rishab Joshi
>>>>>>>
>>>>>>

Reply via email to