+1, I agree with what Mridul said.

Regards
Jie Yang

On 2025/12/02 00:48:43 Mridul Muralidharan wrote:
> I am actually not in favor of this proposal, given there are existing
> solutions - including various Apache projects - which handle this:
> including reading/writing to cloud storage: without needing a change to how
> Spark itself operates.
> The benefits, compared to existing approaches, is not clear.
> 
> 
> Regards,
> Mridul
> 
> On Mon, Dec 1, 2025 at 1:47 PM Aaron Dantley <[email protected]> wrote:
> 
> > Please remove me from this distribution list.
> >
> > Thanks
> >
> > On Mon, Dec 1, 2025 at 2:33 PM karuppayya <[email protected]>
> > wrote:
> >
> >> Hi everyone,
> >> Thank you all for your valuable comments and discussion on the design
> >> document/this email. I have replied to the comments/concerns raised.
> >> I welcome any other questions and to be challenged further.
> >>
> >> Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
> >>
> >> If there are no other open questions by Wednesday morning (PST), I will
> >> request Chao to open the official voting thread (which should give 72
> >> hours for the process
> >> <https://spark.apache.org/improvement-proposals.html>).
> >>
> >> - Karuppayya
> >>
> >> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
> >> [email protected]> wrote:
> >>
> >>> One aspect that hasn’t been mentioned yet (or so I think) is the
> >>> thread-level behavior of shuffle. In large workloads with many small
> >>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of
> >>> threads tied to shuffle fetch operations, Netty client handlers, and block
> >>> file access.
> >>> Since the proposal changes block granularity and fetch patterns, it
> >>> would be valuable to explicitly consider how the consolidation stage
> >>> affects:
> >>> – the number of concurrent fetch operations
> >>> – thread pool growth / saturation
> >>> – Netty transport threads
> >>> – memory pressure from large in-flight reads
> >>>
> >>> Btw, I find your proposal quite interesting.
> >>>
> >>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
> >>> escribió:
> >>>
> >>>> Rishab, Wenchen, Murali,
> >>>> Thank you very much for taking the time to review the proposal and for
> >>>> providing such thoughtful and insightful comments/questions.
> >>>>
> >>>> *Rishab*,
> >>>>
> >>>>> * suitable across all storage systems*
> >>>>
> >>>> You are right that the suitability is somewhat subjective and dependent
> >>>> on cloud provider used.
> >>>> In general, the goal of ShuffleVault is to utilize the standard Hadoop
> >>>> FileSystem APIs, which means it should work seamlessly with popular cloud
> >>>> and distributed file systems (like S3, HDFS, GFS, etc.).
> >>>> These systems share a similar nature and are designed for large files.
> >>>>
> >>>> *large file could create problems during retrieval.*
> >>>>
> >>>> We are mitigating this risk by ensuring that  tasks do not read the
> >>>> entire consolidated file at once.
> >>>> Instead, the implementation is designed to read the data in configured
> >>>> blocks, rather than relying on a single read. *This behavior can be
> >>>> refined/validated* to make it more robust.
> >>>>
> >>>> *Wenchen,*
> >>>> I fully agree that the operational details around using cloud storage
> >>>> for shuffle—specifically traffic throttling, cleanup guarantees
> >>>> and overall request-related network cost — these are critical issues
> >>>> that must be solved.
> >>>> The consolidation stage is explicitly designed to mitigate the
> >>>> throttling and accompanying cost issues .
> >>>> *Throttling* - By consolidating shuffle data, this approach transforms
> >>>> the read pattern from a multitude of small, random requests into fewer,
> >>>> large, targeted ones. Particularly beneficial for modern cloud object
> >>>> storage.
> >>>> *Shuffle cleanup* -  I am actively trying to leverage the Shufffle
> >>>> clean up mode and also making an effort to make them robust .These 
> >>>> cleanup
> >>>> improvements should be beneficial, regardless of this proposal and cover
> >>>> most cases.
> >>>> However, I agree that to ensure *no orphaned files* remain, we will
> >>>> still require other means (such as remote storage lifecycle policies or
> >>>> job-specific scripts) for a guaranteed cleanup.
> >>>> Thank you again for your valuable feedback, especially the validation
> >>>> on synchronous scheduling and AQE integration.
> >>>>
> >>>> *Murali,*
> >>>>
> >>>>> * Doing an explicit sort stage*
> >>>>
> >>>> To clarify, ShuffleVault does not introduce an explicit sort stage.
> >>>> Instead, it introduces a Shuffle Consolidation Stage.
> >>>> This stage is a pure passthrough operation that only aggregates
> >>>> scattered shuffle data for a given reducer partition.
> >>>> In simple terms, it functions as an additional reducer stage that reads
> >>>> the fragmented shuffle files from the mappers and writes them as a 
> >>>> single,
> >>>> consolidated, durable file to remote storage.
> >>>>
> >>>> *but that would be a nontrivial change *
> >>>>
> >>>> I agree that the change is significant, I am  actively working to
> >>>> ensure the benefits are leveraged across the stack. This PR
> >>>> <https://github.com/apache/spark/pull/53028> demonstrates integration
> >>>> with AQE and interactions with other rules(Exchange reuse, Shuffle
> >>>> Partition Coalescing ect).
> >>>> I would genuinely appreciate it if you could take a look at the POC PR
> >>>> to see the scope00 of changes. The primary logic is encapsulated within a
> >>>> new Spark Physical Planner Rule
> >>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
> >>>> that injects the consolidation stage, which is the main crux.
> >>>>
> >>>> I welcome any further questions or comments!
> >>>>
> >>>> Thanks & Regards
> >>>> Karuppayya
> >>>>
> >>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]>
> >>>> wrote:
> >>>>
> >>>>>
> >>>>> There are existing Apache projects which provide the capabilities
> >>>>> which largely addresses the problem statement - Apache Celeborn, Apache
> >>>>> Uniffle, Zeus, etc.
> >>>>> Doing an explicit sort stage, between "map" and "reduce" brings with
> >>>>> it some nice advantages, especially if the output is durable, but that
> >>>>> would be a nontrivial change - and should be attempted if the benefits 
> >>>>> are
> >>>>> being leveraged throughout the stack (AQE, speculative execution, etc)
> >>>>>
> >>>>> Regards,
> >>>>> Mridul
> >>>>>
> >>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Karuppayya,
> >>>>>>
> >>>>>> Handling large shuffles in Spark is challenging and it's great to see
> >>>>>> proposals addressing it. I think the extra "shuffle consolidation 
> >>>>>> stage" is
> >>>>>> a good idea, and now I feel it's better for it to be synchronous, so 
> >>>>>> that
> >>>>>> we can integrate it with AQE and leverage the accurate runtime shuffle
> >>>>>> status to make decisions about whether or not to launch this extra 
> >>>>>> "shuffle
> >>>>>> consolidation stage" and how to consolidate. This is a key 
> >>>>>> differentiator
> >>>>>> compared to the push-based shuffle.
> >>>>>>
> >>>>>> However, there are many details to consider, and in general it's
> >>>>>> difficult to use cloud storage for shuffle. We need to deal with 
> >>>>>> problems
> >>>>>> like traffic throttling, cleanup guarantee, cost control, and so on. 
> >>>>>> Let's
> >>>>>> take a step back and see what are the actual problems of large 
> >>>>>> shuffles.
> >>>>>>
> >>>>>> Large shuffle usually starts with a large number of mappers that we
> >>>>>> can't adjust (e.g. large table scan). We can adjust the number of 
> >>>>>> reducers
> >>>>>> to reach two goals:
> >>>>>> 1. The input data size of each reducer shouldn't be too large, which
> >>>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid
> >>>>>> spilling/OOM during reducer task execution.
> >>>>>> 2. The data size of each shuffle block shoudn't be too small, which
> >>>>>> is roughly *total_shuffle_size / (num_mappers * num_reducers)*. This
> >>>>>> is for the good of disk/network IO.
> >>>>>>
> >>>>>> These two goals are actually contradictory and sometimes we have to
> >>>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the
> >>>>>> query can finish. An extra "shuffle consolidation stage" can kind of
> >>>>>> decrease the number of mappers, by merging the shuffle files from 
> >>>>>> multiple
> >>>>>> mappers. This can be a clear win as fetching many small shuffle blocks 
> >>>>>> can
> >>>>>> be quite slow, even slower than running an extra "shuffle consolidation
> >>>>>> stage".
> >>>>>>
> >>>>>> In addition, the nodes that host shuffle files shouldn't be too many
> >>>>>> (best to be 0 which means shuffle files are stored in a different 
> >>>>>> storage).
> >>>>>> With a large number of mappers, likely every node in the cluster stores
> >>>>>> some shuffle files. By merging shuffle files via the extra "shuffle
> >>>>>> consolidation stage", we can decrease the number of nodes that host 
> >>>>>> active
> >>>>>> shuffle data, so that the cluster is more elastic.
> >>>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Wenchen
> >>>>>>
> >>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Karuppayya,
> >>>>>>>
> >>>>>>> Thanks for sharing the proposal and this looks very exciting.
> >>>>>>>
> >>>>>>> I have a few questions and please correct me if I misunderstood
> >>>>>>> anything.
> >>>>>>>
> >>>>>>> Would it be possible to clarify whether the consolidated shuffle
> >>>>>>> file produced for each partition is suitable across all storage 
> >>>>>>> systems,
> >>>>>>> especially when this file becomes extremely large? I am wondering if 
> >>>>>>> a very
> >>>>>>> large file could create problems during retrieval. For example, if a
> >>>>>>> connection breaks while reading the file, some storage systems may not
> >>>>>>> support resuming reads from the point of failure and start reading 
> >>>>>>> the file
> >>>>>>> from the beginning again. This could lead to higher latency, repeated
> >>>>>>> retries, or performance bottlenecks when a partition becomes too 
> >>>>>>> large or
> >>>>>>> skewed?
> >>>>>>>
> >>>>>>> Would it make sense to introduce a configurable upper-bound on the
> >>>>>>> maximum allowed file size? This might prevent the file from growing
> >>>>>>> massively.
> >>>>>>> Should the consolidated shuffle file be compressed before being
> >>>>>>> written to the storage system. Compression might introduce additional
> >>>>>>> latency but that too can be a configurable option.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Rishab Joshi
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Enrico,
> >>>>>>>> Thank you very much for reviewing the doc.
> >>>>>>>>
> >>>>>>>> *Since the consolidation stage reads all the shuffle data, why not
> >>>>>>>>> doing the transformation in that stage? What is the point in 
> >>>>>>>>> deferring the
> >>>>>>>>> transformations into another stage?*
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> The reason for deferring the final consolidation to a subsequent
> >>>>>>>> stage lies in the distributed nature of shuffle data.
> >>>>>>>> Reducer requires reading all corresponding shuffle data written
> >>>>>>>> across all map tasks. Since each mapper only holds its own local 
> >>>>>>>> output,
> >>>>>>>> the consolidation cannot begin until all the map stage completes.
> >>>>>>>>
> >>>>>>>> However, your question is also aligned to one of the approaches
> >>>>>>>> mentioned (concurrent consolidation
> >>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
> >>>>>>>> which was specifically considered.
> >>>>>>>>
> >>>>>>>> While the synchronous consolidation happens afetr all the data is
> >>>>>>>> available , concurrent consolidation can aggregate and persist the
> >>>>>>>> already-generated shuffle data to begin concurrently with the 
> >>>>>>>> remaining map
> >>>>>>>> tasks, thereby making the shuffle durable much earlier instead of 
> >>>>>>>> having to
> >>>>>>>> wait for all map tasks to complete.
> >>>>>>>>
> >>>>>>>> - Karuppayya
> >>>>>>>>
> >>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> another remark regarding a remote shuffle storage solution:
> >>>>>>>>> As long as the map executors are alive, reduce executors should
> >>>>>>>>> read from them to avoid any extra delay / overhead.
> >>>>>>>>> On fetch failure from a map executor, the reduce executors should
> >>>>>>>>> fall back to a remote storage that provides a copy (merged or not) 
> >>>>>>>>> of the
> >>>>>>>>> shuffle data.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Enrico
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
> >>>>>>>>>
> >>>>>>>>> Hi Karuppayya,
> >>>>>>>>>
> >>>>>>>>> thanks for your proposal and bringing up this issue.
> >>>>>>>>>
> >>>>>>>>> I am very much in favour of a shuffle storage solution that allows
> >>>>>>>>> for dynamic allocation and node failure in a K8S environment, 
> >>>>>>>>> without the
> >>>>>>>>> burden of managing an Remote Shuffle Service.
> >>>>>>>>>
> >>>>>>>>> I have the following comments:
> >>>>>>>>>
> >>>>>>>>> Your proposed consolidation stage is equivalent to the next
> >>>>>>>>> reducer stage in the sense that it reads shuffle data from the 
> >>>>>>>>> earlier map
> >>>>>>>>> stage. This requires the executors of the map stage to survive 
> >>>>>>>>> until the
> >>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). 
> >>>>>>>>> Therefore, I
> >>>>>>>>> think this passage of your design document is not accurate:
> >>>>>>>>>
> >>>>>>>>>     Executors that perform the initial map tasks (shuffle writers)
> >>>>>>>>> can be immediately deallocated after writing their shuffle data ...
> >>>>>>>>>
> >>>>>>>>> Since the consolidation stage reads all the shuffle data, why not
> >>>>>>>>> doing the transformation in that stage? What is the point in 
> >>>>>>>>> deferring the
> >>>>>>>>> transformations into another stage?
> >>>>>>>>>
> >>>>>>>>> You mention the "Native Shuffle Block Migration" and say its
> >>>>>>>>> limitation is "It simply shifts the storage burden to other active
> >>>>>>>>> executors".
> >>>>>>>>> Please consider that the migration process can migrate to a (in
> >>>>>>>>> Spark called) fallback storage, which essentially copies the 
> >>>>>>>>> shuffle data
> >>>>>>>>> to a remote storage.
> >>>>>>>>> Kind regards,
> >>>>>>>>> Enrico
> >>>>>>>>>
> >>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
> >>>>>>>>>
> >>>>>>>>>  Hi All,
> >>>>>>>>>
> >>>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively
> >>>>>>>>> in Spark* .
> >>>>>>>>>
> >>>>>>>>> This approach would fundamentally decouple shuffle storage from
> >>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help
> >>>>>>>>> with aggressive downscaling*.
> >>>>>>>>>
> >>>>>>>>> The primary goal is to enhance the *elasticity and resilience* of
> >>>>>>>>> Spark workloads, leading to substantial cost optimization 
> >>>>>>>>> opportunities.
> >>>>>>>>>
> >>>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.*
> >>>>>>>>> *Looking forward to your feedback! *
> >>>>>>>>>
> >>>>>>>>> JIRA: SPARK-53484
> >>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
> >>>>>>>>> SPIP doc
> >>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
> >>>>>>>>> ,
> >>>>>>>>> Design doc
> >>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
> >>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Karuppayya
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Regards
> >>>>>>> Rishab Joshi
> >>>>>>>
> >>>>>>
> 

---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]

Reply via email to