+1, I agree with what Mridul said. Regards Jie Yang
On 2025/12/02 00:48:43 Mridul Muralidharan wrote: > I am actually not in favor of this proposal, given there are existing > solutions - including various Apache projects - which handle this: > including reading/writing to cloud storage: without needing a change to how > Spark itself operates. > The benefits, compared to existing approaches, is not clear. > > > Regards, > Mridul > > On Mon, Dec 1, 2025 at 1:47 PM Aaron Dantley <[email protected]> wrote: > > > Please remove me from this distribution list. > > > > Thanks > > > > On Mon, Dec 1, 2025 at 2:33 PM karuppayya <[email protected]> > > wrote: > > > >> Hi everyone, > >> Thank you all for your valuable comments and discussion on the design > >> document/this email. I have replied to the comments/concerns raised. > >> I welcome any other questions and to be challenged further. > >> > >> Also *Sun Chao* accepted to shepherd this proposal(Thank you!) > >> > >> If there are no other open questions by Wednesday morning (PST), I will > >> request Chao to open the official voting thread (which should give 72 > >> hours for the process > >> <https://spark.apache.org/improvement-proposals.html>). > >> > >> - Karuppayya > >> > >> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua < > >> [email protected]> wrote: > >> > >>> One aspect that hasn’t been mentioned yet (or so I think) is the > >>> thread-level behavior of shuffle. In large workloads with many small > >>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of > >>> threads tied to shuffle fetch operations, Netty client handlers, and block > >>> file access. > >>> Since the proposal changes block granularity and fetch patterns, it > >>> would be valuable to explicitly consider how the consolidation stage > >>> affects: > >>> – the number of concurrent fetch operations > >>> – thread pool growth / saturation > >>> – Netty transport threads > >>> – memory pressure from large in-flight reads > >>> > >>> Btw, I find your proposal quite interesting. > >>> > >>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]> > >>> escribió: > >>> > >>>> Rishab, Wenchen, Murali, > >>>> Thank you very much for taking the time to review the proposal and for > >>>> providing such thoughtful and insightful comments/questions. > >>>> > >>>> *Rishab*, > >>>> > >>>>> * suitable across all storage systems* > >>>> > >>>> You are right that the suitability is somewhat subjective and dependent > >>>> on cloud provider used. > >>>> In general, the goal of ShuffleVault is to utilize the standard Hadoop > >>>> FileSystem APIs, which means it should work seamlessly with popular cloud > >>>> and distributed file systems (like S3, HDFS, GFS, etc.). > >>>> These systems share a similar nature and are designed for large files. > >>>> > >>>> *large file could create problems during retrieval.* > >>>> > >>>> We are mitigating this risk by ensuring that tasks do not read the > >>>> entire consolidated file at once. > >>>> Instead, the implementation is designed to read the data in configured > >>>> blocks, rather than relying on a single read. *This behavior can be > >>>> refined/validated* to make it more robust. > >>>> > >>>> *Wenchen,* > >>>> I fully agree that the operational details around using cloud storage > >>>> for shuffle—specifically traffic throttling, cleanup guarantees > >>>> and overall request-related network cost — these are critical issues > >>>> that must be solved. > >>>> The consolidation stage is explicitly designed to mitigate the > >>>> throttling and accompanying cost issues . > >>>> *Throttling* - By consolidating shuffle data, this approach transforms > >>>> the read pattern from a multitude of small, random requests into fewer, > >>>> large, targeted ones. Particularly beneficial for modern cloud object > >>>> storage. > >>>> *Shuffle cleanup* - I am actively trying to leverage the Shufffle > >>>> clean up mode and also making an effort to make them robust .These > >>>> cleanup > >>>> improvements should be beneficial, regardless of this proposal and cover > >>>> most cases. > >>>> However, I agree that to ensure *no orphaned files* remain, we will > >>>> still require other means (such as remote storage lifecycle policies or > >>>> job-specific scripts) for a guaranteed cleanup. > >>>> Thank you again for your valuable feedback, especially the validation > >>>> on synchronous scheduling and AQE integration. > >>>> > >>>> *Murali,* > >>>> > >>>>> * Doing an explicit sort stage* > >>>> > >>>> To clarify, ShuffleVault does not introduce an explicit sort stage. > >>>> Instead, it introduces a Shuffle Consolidation Stage. > >>>> This stage is a pure passthrough operation that only aggregates > >>>> scattered shuffle data for a given reducer partition. > >>>> In simple terms, it functions as an additional reducer stage that reads > >>>> the fragmented shuffle files from the mappers and writes them as a > >>>> single, > >>>> consolidated, durable file to remote storage. > >>>> > >>>> *but that would be a nontrivial change * > >>>> > >>>> I agree that the change is significant, I am actively working to > >>>> ensure the benefits are leveraged across the stack. This PR > >>>> <https://github.com/apache/spark/pull/53028> demonstrates integration > >>>> with AQE and interactions with other rules(Exchange reuse, Shuffle > >>>> Partition Coalescing ect). > >>>> I would genuinely appreciate it if you could take a look at the POC PR > >>>> to see the scope00 of changes. The primary logic is encapsulated within a > >>>> new Spark Physical Planner Rule > >>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6> > >>>> that injects the consolidation stage, which is the main crux. > >>>> > >>>> I welcome any further questions or comments! > >>>> > >>>> Thanks & Regards > >>>> Karuppayya > >>>> > >>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]> > >>>> wrote: > >>>> > >>>>> > >>>>> There are existing Apache projects which provide the capabilities > >>>>> which largely addresses the problem statement - Apache Celeborn, Apache > >>>>> Uniffle, Zeus, etc. > >>>>> Doing an explicit sort stage, between "map" and "reduce" brings with > >>>>> it some nice advantages, especially if the output is durable, but that > >>>>> would be a nontrivial change - and should be attempted if the benefits > >>>>> are > >>>>> being leveraged throughout the stack (AQE, speculative execution, etc) > >>>>> > >>>>> Regards, > >>>>> Mridul > >>>>> > >>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]> > >>>>> wrote: > >>>>> > >>>>>> Hi Karuppayya, > >>>>>> > >>>>>> Handling large shuffles in Spark is challenging and it's great to see > >>>>>> proposals addressing it. I think the extra "shuffle consolidation > >>>>>> stage" is > >>>>>> a good idea, and now I feel it's better for it to be synchronous, so > >>>>>> that > >>>>>> we can integrate it with AQE and leverage the accurate runtime shuffle > >>>>>> status to make decisions about whether or not to launch this extra > >>>>>> "shuffle > >>>>>> consolidation stage" and how to consolidate. This is a key > >>>>>> differentiator > >>>>>> compared to the push-based shuffle. > >>>>>> > >>>>>> However, there are many details to consider, and in general it's > >>>>>> difficult to use cloud storage for shuffle. We need to deal with > >>>>>> problems > >>>>>> like traffic throttling, cleanup guarantee, cost control, and so on. > >>>>>> Let's > >>>>>> take a step back and see what are the actual problems of large > >>>>>> shuffles. > >>>>>> > >>>>>> Large shuffle usually starts with a large number of mappers that we > >>>>>> can't adjust (e.g. large table scan). We can adjust the number of > >>>>>> reducers > >>>>>> to reach two goals: > >>>>>> 1. The input data size of each reducer shouldn't be too large, which > >>>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid > >>>>>> spilling/OOM during reducer task execution. > >>>>>> 2. The data size of each shuffle block shoudn't be too small, which > >>>>>> is roughly *total_shuffle_size / (num_mappers * num_reducers)*. This > >>>>>> is for the good of disk/network IO. > >>>>>> > >>>>>> These two goals are actually contradictory and sometimes we have to > >>>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the > >>>>>> query can finish. An extra "shuffle consolidation stage" can kind of > >>>>>> decrease the number of mappers, by merging the shuffle files from > >>>>>> multiple > >>>>>> mappers. This can be a clear win as fetching many small shuffle blocks > >>>>>> can > >>>>>> be quite slow, even slower than running an extra "shuffle consolidation > >>>>>> stage". > >>>>>> > >>>>>> In addition, the nodes that host shuffle files shouldn't be too many > >>>>>> (best to be 0 which means shuffle files are stored in a different > >>>>>> storage). > >>>>>> With a large number of mappers, likely every node in the cluster stores > >>>>>> some shuffle files. By merging shuffle files via the extra "shuffle > >>>>>> consolidation stage", we can decrease the number of nodes that host > >>>>>> active > >>>>>> shuffle data, so that the cluster is more elastic. > >>>>>> > >>>>>> > >>>>>> Thanks, > >>>>>> Wenchen > >>>>>> > >>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Karuppayya, > >>>>>>> > >>>>>>> Thanks for sharing the proposal and this looks very exciting. > >>>>>>> > >>>>>>> I have a few questions and please correct me if I misunderstood > >>>>>>> anything. > >>>>>>> > >>>>>>> Would it be possible to clarify whether the consolidated shuffle > >>>>>>> file produced for each partition is suitable across all storage > >>>>>>> systems, > >>>>>>> especially when this file becomes extremely large? I am wondering if > >>>>>>> a very > >>>>>>> large file could create problems during retrieval. For example, if a > >>>>>>> connection breaks while reading the file, some storage systems may not > >>>>>>> support resuming reads from the point of failure and start reading > >>>>>>> the file > >>>>>>> from the beginning again. This could lead to higher latency, repeated > >>>>>>> retries, or performance bottlenecks when a partition becomes too > >>>>>>> large or > >>>>>>> skewed? > >>>>>>> > >>>>>>> Would it make sense to introduce a configurable upper-bound on the > >>>>>>> maximum allowed file size? This might prevent the file from growing > >>>>>>> massively. > >>>>>>> Should the consolidated shuffle file be compressed before being > >>>>>>> written to the storage system. Compression might introduce additional > >>>>>>> latency but that too can be a configurable option. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Rishab Joshi > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Enrico, > >>>>>>>> Thank you very much for reviewing the doc. > >>>>>>>> > >>>>>>>> *Since the consolidation stage reads all the shuffle data, why not > >>>>>>>>> doing the transformation in that stage? What is the point in > >>>>>>>>> deferring the > >>>>>>>>> transformations into another stage?* > >>>>>>>> > >>>>>>>> > >>>>>>>> The reason for deferring the final consolidation to a subsequent > >>>>>>>> stage lies in the distributed nature of shuffle data. > >>>>>>>> Reducer requires reading all corresponding shuffle data written > >>>>>>>> across all map tasks. Since each mapper only holds its own local > >>>>>>>> output, > >>>>>>>> the consolidation cannot begin until all the map stage completes. > >>>>>>>> > >>>>>>>> However, your question is also aligned to one of the approaches > >>>>>>>> mentioned (concurrent consolidation > >>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>), > >>>>>>>> which was specifically considered. > >>>>>>>> > >>>>>>>> While the synchronous consolidation happens afetr all the data is > >>>>>>>> available , concurrent consolidation can aggregate and persist the > >>>>>>>> already-generated shuffle data to begin concurrently with the > >>>>>>>> remaining map > >>>>>>>> tasks, thereby making the shuffle durable much earlier instead of > >>>>>>>> having to > >>>>>>>> wait for all map tasks to complete. > >>>>>>>> > >>>>>>>> - Karuppayya > >>>>>>>> > >>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> another remark regarding a remote shuffle storage solution: > >>>>>>>>> As long as the map executors are alive, reduce executors should > >>>>>>>>> read from them to avoid any extra delay / overhead. > >>>>>>>>> On fetch failure from a map executor, the reduce executors should > >>>>>>>>> fall back to a remote storage that provides a copy (merged or not) > >>>>>>>>> of the > >>>>>>>>> shuffle data. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Enrico > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack: > >>>>>>>>> > >>>>>>>>> Hi Karuppayya, > >>>>>>>>> > >>>>>>>>> thanks for your proposal and bringing up this issue. > >>>>>>>>> > >>>>>>>>> I am very much in favour of a shuffle storage solution that allows > >>>>>>>>> for dynamic allocation and node failure in a K8S environment, > >>>>>>>>> without the > >>>>>>>>> burden of managing an Remote Shuffle Service. > >>>>>>>>> > >>>>>>>>> I have the following comments: > >>>>>>>>> > >>>>>>>>> Your proposed consolidation stage is equivalent to the next > >>>>>>>>> reducer stage in the sense that it reads shuffle data from the > >>>>>>>>> earlier map > >>>>>>>>> stage. This requires the executors of the map stage to survive > >>>>>>>>> until the > >>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). > >>>>>>>>> Therefore, I > >>>>>>>>> think this passage of your design document is not accurate: > >>>>>>>>> > >>>>>>>>> Executors that perform the initial map tasks (shuffle writers) > >>>>>>>>> can be immediately deallocated after writing their shuffle data ... > >>>>>>>>> > >>>>>>>>> Since the consolidation stage reads all the shuffle data, why not > >>>>>>>>> doing the transformation in that stage? What is the point in > >>>>>>>>> deferring the > >>>>>>>>> transformations into another stage? > >>>>>>>>> > >>>>>>>>> You mention the "Native Shuffle Block Migration" and say its > >>>>>>>>> limitation is "It simply shifts the storage burden to other active > >>>>>>>>> executors". > >>>>>>>>> Please consider that the migration process can migrate to a (in > >>>>>>>>> Spark called) fallback storage, which essentially copies the > >>>>>>>>> shuffle data > >>>>>>>>> to a remote storage. > >>>>>>>>> Kind regards, > >>>>>>>>> Enrico > >>>>>>>>> > >>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya: > >>>>>>>>> > >>>>>>>>> Hi All, > >>>>>>>>> > >>>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively > >>>>>>>>> in Spark* . > >>>>>>>>> > >>>>>>>>> This approach would fundamentally decouple shuffle storage from > >>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help > >>>>>>>>> with aggressive downscaling*. > >>>>>>>>> > >>>>>>>>> The primary goal is to enhance the *elasticity and resilience* of > >>>>>>>>> Spark workloads, leading to substantial cost optimization > >>>>>>>>> opportunities. > >>>>>>>>> > >>>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.* > >>>>>>>>> *Looking forward to your feedback! * > >>>>>>>>> > >>>>>>>>> JIRA: SPARK-53484 > >>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327> > >>>>>>>>> SPIP doc > >>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw> > >>>>>>>>> , > >>>>>>>>> Design doc > >>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0> > >>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028> > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Karuppayya > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Regards > >>>>>>> Rishab Joshi > >>>>>>> > >>>>>> > --------------------------------------------------------------------- To unsubscribe e-mail: [email protected]
