Hi Mridul, Yang, Thank you for seeking further clarification.

*The benefits, compared to existing approaches, is not clear.*


I think these are the top level benefits amongst others.

   - *Minimal Operational Overhead* by removing the need to deploy,
   configure, and maintain an external service, which typically also requires
   continuous maintenance and demands specific system knowledge.
   - *Minimizes cost* by not having additional dedicated hardware for the
   service. Also eliminating the need for continuous maintenance lower ongoing
   costs.

*without needing a change to how Spark itself operates.*


The above benefits mentioned above is largely from having the feature in
Spark core.
Being an integral part of Spark allows the system to inherently leverage
Spark's other abilities that will be useful for this proposal: Highlighling
few

   - Leveraging *power of AQE* to make dynamic decisions.
   - Aggressive shuffle removal on query completion (essential to *reduce
   storage costs* associated with remote shuffle data.)
   - *Better Debug ability* with monitoring/troubleshooting happening via
   familiar Spark UI and Spark logs.

I also acknowledge that existing Apache projects offer alternative
solutions.
These solutions can certainly co-exist, and the decision to use an approach
will be mainly based on the balance between *raw performance* and the
*operational
overhead*.
I would be happy to further discuss other limitations/concerns this
approach has.

Thanks & Regards
Karuppayya

On Mon, Dec 1, 2025 at 10:22 PM Yang Jie <[email protected]> wrote:

> +1, I agree with what Mridul said.
>
> Regards
> Jie Yang
>
> On 2025/12/02 00:48:43 Mridul Muralidharan wrote:
> > I am actually not in favor of this proposal, given there are existing
> > solutions - including various Apache projects - which handle this:
> > including reading/writing to cloud storage: without needing a change to
> how
> > Spark itself operates.
> > The benefits, compared to existing approaches, is not clear.
> >
> >
> > Regards,
> > Mridul
> >
> > On Mon, Dec 1, 2025 at 1:47 PM Aaron Dantley <[email protected]>
> wrote:
> >
> > > Please remove me from this distribution list.
> > >
> > > Thanks
> > >
> > > On Mon, Dec 1, 2025 at 2:33 PM karuppayya <[email protected]>
> > > wrote:
> > >
> > >> Hi everyone,
> > >> Thank you all for your valuable comments and discussion on the design
> > >> document/this email. I have replied to the comments/concerns raised.
> > >> I welcome any other questions and to be challenged further.
> > >>
> > >> Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
> > >>
> > >> If there are no other open questions by Wednesday morning (PST), I
> will
> > >> request Chao to open the official voting thread (which should give 72
> > >> hours for the process
> > >> <https://spark.apache.org/improvement-proposals.html>).
> > >>
> > >> - Karuppayya
> > >>
> > >> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
> > >> [email protected]> wrote:
> > >>
> > >>> One aspect that hasn’t been mentioned yet (or so I think) is the
> > >>> thread-level behavior of shuffle. In large workloads with many small
> > >>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds
> of
> > >>> threads tied to shuffle fetch operations, Netty client handlers, and
> block
> > >>> file access.
> > >>> Since the proposal changes block granularity and fetch patterns, it
> > >>> would be valuable to explicitly consider how the consolidation stage
> > >>> affects:
> > >>> – the number of concurrent fetch operations
> > >>> – thread pool growth / saturation
> > >>> – Netty transport threads
> > >>> – memory pressure from large in-flight reads
> > >>>
> > >>> Btw, I find your proposal quite interesting.
> > >>>
> > >>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
> > >>> escribió:
> > >>>
> > >>>> Rishab, Wenchen, Murali,
> > >>>> Thank you very much for taking the time to review the proposal and
> for
> > >>>> providing such thoughtful and insightful comments/questions.
> > >>>>
> > >>>> *Rishab*,
> > >>>>
> > >>>>> * suitable across all storage systems*
> > >>>>
> > >>>> You are right that the suitability is somewhat subjective and
> dependent
> > >>>> on cloud provider used.
> > >>>> In general, the goal of ShuffleVault is to utilize the standard
> Hadoop
> > >>>> FileSystem APIs, which means it should work seamlessly with popular
> cloud
> > >>>> and distributed file systems (like S3, HDFS, GFS, etc.).
> > >>>> These systems share a similar nature and are designed for large
> files.
> > >>>>
> > >>>> *large file could create problems during retrieval.*
> > >>>>
> > >>>> We are mitigating this risk by ensuring that  tasks do not read the
> > >>>> entire consolidated file at once.
> > >>>> Instead, the implementation is designed to read the data in
> configured
> > >>>> blocks, rather than relying on a single read. *This behavior can be
> > >>>> refined/validated* to make it more robust.
> > >>>>
> > >>>> *Wenchen,*
> > >>>> I fully agree that the operational details around using cloud
> storage
> > >>>> for shuffle—specifically traffic throttling, cleanup guarantees
> > >>>> and overall request-related network cost — these are critical issues
> > >>>> that must be solved.
> > >>>> The consolidation stage is explicitly designed to mitigate the
> > >>>> throttling and accompanying cost issues .
> > >>>> *Throttling* - By consolidating shuffle data, this approach
> transforms
> > >>>> the read pattern from a multitude of small, random requests into
> fewer,
> > >>>> large, targeted ones. Particularly beneficial for modern cloud
> object
> > >>>> storage.
> > >>>> *Shuffle cleanup* -  I am actively trying to leverage the Shufffle
> > >>>> clean up mode and also making an effort to make them robust .These
> cleanup
> > >>>> improvements should be beneficial, regardless of this proposal and
> cover
> > >>>> most cases.
> > >>>> However, I agree that to ensure *no orphaned files* remain, we will
> > >>>> still require other means (such as remote storage lifecycle
> policies or
> > >>>> job-specific scripts) for a guaranteed cleanup.
> > >>>> Thank you again for your valuable feedback, especially the
> validation
> > >>>> on synchronous scheduling and AQE integration.
> > >>>>
> > >>>> *Murali,*
> > >>>>
> > >>>>> * Doing an explicit sort stage*
> > >>>>
> > >>>> To clarify, ShuffleVault does not introduce an explicit sort stage.
> > >>>> Instead, it introduces a Shuffle Consolidation Stage.
> > >>>> This stage is a pure passthrough operation that only aggregates
> > >>>> scattered shuffle data for a given reducer partition.
> > >>>> In simple terms, it functions as an additional reducer stage that
> reads
> > >>>> the fragmented shuffle files from the mappers and writes them as a
> single,
> > >>>> consolidated, durable file to remote storage.
> > >>>>
> > >>>> *but that would be a nontrivial change *
> > >>>>
> > >>>> I agree that the change is significant, I am  actively working to
> > >>>> ensure the benefits are leveraged across the stack. This PR
> > >>>> <https://github.com/apache/spark/pull/53028> demonstrates
> integration
> > >>>> with AQE and interactions with other rules(Exchange reuse, Shuffle
> > >>>> Partition Coalescing ect).
> > >>>> I would genuinely appreciate it if you could take a look at the POC
> PR
> > >>>> to see the scope00 of changes. The primary logic is encapsulated
> within a
> > >>>> new Spark Physical Planner Rule
> > >>>> <
> https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6
> >
> > >>>> that injects the consolidation stage, which is the main crux.
> > >>>>
> > >>>> I welcome any further questions or comments!
> > >>>>
> > >>>> Thanks & Regards
> > >>>> Karuppayya
> > >>>>
> > >>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <
> [email protected]>
> > >>>> wrote:
> > >>>>
> > >>>>>
> > >>>>> There are existing Apache projects which provide the capabilities
> > >>>>> which largely addresses the problem statement - Apache Celeborn,
> Apache
> > >>>>> Uniffle, Zeus, etc.
> > >>>>> Doing an explicit sort stage, between "map" and "reduce" brings
> with
> > >>>>> it some nice advantages, especially if the output is durable, but
> that
> > >>>>> would be a nontrivial change - and should be attempted if the
> benefits are
> > >>>>> being leveraged throughout the stack (AQE, speculative execution,
> etc)
> > >>>>>
> > >>>>> Regards,
> > >>>>> Mridul
> > >>>>>
> > >>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Karuppayya,
> > >>>>>>
> > >>>>>> Handling large shuffles in Spark is challenging and it's great to
> see
> > >>>>>> proposals addressing it. I think the extra "shuffle consolidation
> stage" is
> > >>>>>> a good idea, and now I feel it's better for it to be synchronous,
> so that
> > >>>>>> we can integrate it with AQE and leverage the accurate runtime
> shuffle
> > >>>>>> status to make decisions about whether or not to launch this
> extra "shuffle
> > >>>>>> consolidation stage" and how to consolidate. This is a key
> differentiator
> > >>>>>> compared to the push-based shuffle.
> > >>>>>>
> > >>>>>> However, there are many details to consider, and in general it's
> > >>>>>> difficult to use cloud storage for shuffle. We need to deal with
> problems
> > >>>>>> like traffic throttling, cleanup guarantee, cost control, and so
> on. Let's
> > >>>>>> take a step back and see what are the actual problems of large
> shuffles.
> > >>>>>>
> > >>>>>> Large shuffle usually starts with a large number of mappers that
> we
> > >>>>>> can't adjust (e.g. large table scan). We can adjust the number of
> reducers
> > >>>>>> to reach two goals:
> > >>>>>> 1. The input data size of each reducer shouldn't be too large,
> which
> > >>>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid
> > >>>>>> spilling/OOM during reducer task execution.
> > >>>>>> 2. The data size of each shuffle block shoudn't be too small,
> which
> > >>>>>> is roughly *total_shuffle_size / (num_mappers * num_reducers)*.
> This
> > >>>>>> is for the good of disk/network IO.
> > >>>>>>
> > >>>>>> These two goals are actually contradictory and sometimes we have
> to
> > >>>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the
> > >>>>>> query can finish. An extra "shuffle consolidation stage" can kind
> of
> > >>>>>> decrease the number of mappers, by merging the shuffle files from
> multiple
> > >>>>>> mappers. This can be a clear win as fetching many small shuffle
> blocks can
> > >>>>>> be quite slow, even slower than running an extra "shuffle
> consolidation
> > >>>>>> stage".
> > >>>>>>
> > >>>>>> In addition, the nodes that host shuffle files shouldn't be too
> many
> > >>>>>> (best to be 0 which means shuffle files are stored in a different
> storage).
> > >>>>>> With a large number of mappers, likely every node in the cluster
> stores
> > >>>>>> some shuffle files. By merging shuffle files via the extra
> "shuffle
> > >>>>>> consolidation stage", we can decrease the number of nodes that
> host active
> > >>>>>> shuffle data, so that the cluster is more elastic.
> > >>>>>>
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Wenchen
> > >>>>>>
> > >>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <
> [email protected]>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Karuppayya,
> > >>>>>>>
> > >>>>>>> Thanks for sharing the proposal and this looks very exciting.
> > >>>>>>>
> > >>>>>>> I have a few questions and please correct me if I misunderstood
> > >>>>>>> anything.
> > >>>>>>>
> > >>>>>>> Would it be possible to clarify whether the consolidated shuffle
> > >>>>>>> file produced for each partition is suitable across all storage
> systems,
> > >>>>>>> especially when this file becomes extremely large? I am
> wondering if a very
> > >>>>>>> large file could create problems during retrieval. For example,
> if a
> > >>>>>>> connection breaks while reading the file, some storage systems
> may not
> > >>>>>>> support resuming reads from the point of failure and start
> reading the file
> > >>>>>>> from the beginning again. This could lead to higher latency,
> repeated
> > >>>>>>> retries, or performance bottlenecks when a partition becomes too
> large or
> > >>>>>>> skewed?
> > >>>>>>>
> > >>>>>>> Would it make sense to introduce a configurable upper-bound on
> the
> > >>>>>>> maximum allowed file size? This might prevent the file from
> growing
> > >>>>>>> massively.
> > >>>>>>> Should the consolidated shuffle file be compressed before being
> > >>>>>>> written to the storage system. Compression might introduce
> additional
> > >>>>>>> latency but that too can be a configurable option.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Rishab Joshi
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <
> [email protected]>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Enrico,
> > >>>>>>>> Thank you very much for reviewing the doc.
> > >>>>>>>>
> > >>>>>>>> *Since the consolidation stage reads all the shuffle data, why
> not
> > >>>>>>>>> doing the transformation in that stage? What is the point in
> deferring the
> > >>>>>>>>> transformations into another stage?*
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> The reason for deferring the final consolidation to a subsequent
> > >>>>>>>> stage lies in the distributed nature of shuffle data.
> > >>>>>>>> Reducer requires reading all corresponding shuffle data written
> > >>>>>>>> across all map tasks. Since each mapper only holds its own
> local output,
> > >>>>>>>> the consolidation cannot begin until all the map stage
> completes.
> > >>>>>>>>
> > >>>>>>>> However, your question is also aligned to one of the approaches
> > >>>>>>>> mentioned (concurrent consolidation
> > >>>>>>>> <
> https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf
> >),
> > >>>>>>>> which was specifically considered.
> > >>>>>>>>
> > >>>>>>>> While the synchronous consolidation happens afetr all the data
> is
> > >>>>>>>> available , concurrent consolidation can aggregate and persist
> the
> > >>>>>>>> already-generated shuffle data to begin concurrently with the
> remaining map
> > >>>>>>>> tasks, thereby making the shuffle durable much earlier instead
> of having to
> > >>>>>>>> wait for all map tasks to complete.
> > >>>>>>>>
> > >>>>>>>> - Karuppayya
> > >>>>>>>>
> > >>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <
> [email protected]>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> another remark regarding a remote shuffle storage solution:
> > >>>>>>>>> As long as the map executors are alive, reduce executors should
> > >>>>>>>>> read from them to avoid any extra delay / overhead.
> > >>>>>>>>> On fetch failure from a map executor, the reduce executors
> should
> > >>>>>>>>> fall back to a remote storage that provides a copy (merged or
> not) of the
> > >>>>>>>>> shuffle data.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Enrico
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
> > >>>>>>>>>
> > >>>>>>>>> Hi Karuppayya,
> > >>>>>>>>>
> > >>>>>>>>> thanks for your proposal and bringing up this issue.
> > >>>>>>>>>
> > >>>>>>>>> I am very much in favour of a shuffle storage solution that
> allows
> > >>>>>>>>> for dynamic allocation and node failure in a K8S environment,
> without the
> > >>>>>>>>> burden of managing an Remote Shuffle Service.
> > >>>>>>>>>
> > >>>>>>>>> I have the following comments:
> > >>>>>>>>>
> > >>>>>>>>> Your proposed consolidation stage is equivalent to the next
> > >>>>>>>>> reducer stage in the sense that it reads shuffle data from the
> earlier map
> > >>>>>>>>> stage. This requires the executors of the map stage to survive
> until the
> > >>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology).
> Therefore, I
> > >>>>>>>>> think this passage of your design document is not accurate:
> > >>>>>>>>>
> > >>>>>>>>>     Executors that perform the initial map tasks (shuffle
> writers)
> > >>>>>>>>> can be immediately deallocated after writing their shuffle
> data ...
> > >>>>>>>>>
> > >>>>>>>>> Since the consolidation stage reads all the shuffle data, why
> not
> > >>>>>>>>> doing the transformation in that stage? What is the point in
> deferring the
> > >>>>>>>>> transformations into another stage?
> > >>>>>>>>>
> > >>>>>>>>> You mention the "Native Shuffle Block Migration" and say its
> > >>>>>>>>> limitation is "It simply shifts the storage burden to other
> active
> > >>>>>>>>> executors".
> > >>>>>>>>> Please consider that the migration process can migrate to a (in
> > >>>>>>>>> Spark called) fallback storage, which essentially copies the
> shuffle data
> > >>>>>>>>> to a remote storage.
> > >>>>>>>>> Kind regards,
> > >>>>>>>>> Enrico
> > >>>>>>>>>
> > >>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
> > >>>>>>>>>
> > >>>>>>>>>  Hi All,
> > >>>>>>>>>
> > >>>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store,
> natively
> > >>>>>>>>> in Spark* .
> > >>>>>>>>>
> > >>>>>>>>> This approach would fundamentally decouple shuffle storage from
> > >>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help
> > >>>>>>>>> with aggressive downscaling*.
> > >>>>>>>>>
> > >>>>>>>>> The primary goal is to enhance the *elasticity and resilience*
> of
> > >>>>>>>>> Spark workloads, leading to substantial cost optimization
> opportunities.
> > >>>>>>>>>
> > >>>>>>>>> *I welcome any initial thoughts or concerns regarding this
> idea.*
> > >>>>>>>>> *Looking forward to your feedback! *
> > >>>>>>>>>
> > >>>>>>>>> JIRA: SPARK-53484
> > >>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
> > >>>>>>>>> SPIP doc
> > >>>>>>>>> <
> https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw
> >
> > >>>>>>>>> ,
> > >>>>>>>>> Design doc
> > >>>>>>>>> <
> https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0
> >
> > >>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>> Karuppayya
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>> Regards
> > >>>>>>> Rishab Joshi
> > >>>>>>>
> > >>>>>>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [email protected]
>
>

Reply via email to