Hi everyone,
Thank you all for your valuable comments and discussion on the design
document/this email. I have replied to the comments/concerns raised.
I welcome any other questions and to be challenged further.
Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
If there are no other open questions by Wednesday morning (PST), I
will request Chao to open the official voting thread (which should
give 72 hours for the process
<https://spark.apache.org/improvement-proposals.html>).
- Karuppayya
On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua
<[email protected]> wrote:
One aspect that hasn’t been mentioned yet (or so I think) is the
thread-level behavior of shuffle. In large workloads with many
small shuffle blocks, I’ve repeatedly observed executors spawning
hundreds of threads tied to shuffle fetch operations, Netty client
handlers, and block file access.
Since the proposal changes block granularity and fetch patterns,
it would be valuable to explicitly consider how the consolidation
stage affects:
– the number of concurrent fetch operations
– thread pool growth / saturation
– Netty transport threads
– memory pressure from large in-flight reads
Btw, I find your proposal quite interesting.
El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
escribió:
Rishab, Wenchen, Murali,
Thank you very much for taking the time to review the proposal
and for providing such thoughtful and insightful
comments/questions.
*Rishab*,
/* suitable across all storage systems*/
You are right that the suitability is somewhat subjective and
dependent on cloud provider used.
In general, the goal of ShuffleVault is to utilize the
standard Hadoop FileSystem APIs, which means it should work
seamlessly with popular cloud and distributed file systems
(like S3, HDFS, GFS, etc.).
These systems share a similar nature and are designed for
large files.
/*large file could create problems during retrieval.*/
We are mitigating this risk by ensuring that tasks do not
read the entire consolidated file at once.
Instead, the implementation is designed to read the data in
configured blocks, rather than relying on a single read. *This
behavior can be refined/validated* to make it more robust.
*Wenchen,*
I fully agree that the operational details around using cloud
storage for shuffle—specifically traffic throttling, cleanup
guarantees
and overall request-related network cost — these are critical
issues that must be solved.
The consolidation stage is explicitly designed to mitigate the
throttling and accompanying cost issues .
*Throttling* - By consolidating shuffle data, this approach
transforms the read pattern from a multitude of small, random
requests into fewer, large, targeted ones. Particularly
beneficial for modern cloud object storage.
*Shuffle cleanup* - I am actively trying to leverage the
Shufffle clean up mode and also making an effort to make them
robust .These cleanup improvements should be beneficial,
regardless of this proposal and cover most cases.
However, I agree that to ensure *no orphaned files* remain, we
will still require other means (such as remote storage
lifecycle policies or job-specific scripts) for a guaranteed
cleanup.
Thank you again for your valuable feedback, especially the
validation on synchronous scheduling and AQE integration.
*Murali,*
*/ Doing an explicit sort stage/*
To clarify, ShuffleVault does not introduce an explicit sort
stage. Instead, it introduces a Shuffle Consolidation Stage.
This stage is a pure passthrough operation that only
aggregates scattered shuffle data for a given reducer partition.
In simple terms, it functions as an additional reducer stage
that reads the fragmented shuffle files from the mappers and
writes them as a single, consolidated, durable file to remote
storage.
*but that would be a nontrivial change *
I agree that the change is significant, I am actively working
to ensure the benefits are leveraged across the stack. This PR
<https://github.com/apache/spark/pull/53028> demonstrates
integration with AQE and interactions with other
rules(Exchange reuse, Shuffle Partition Coalescing ect).
I would genuinely appreciate it if you could take a look at
the POC PR to see the scope00 of changes. The primary logic is
encapsulated within a new Spark Physical Planner Rule
<https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
that injects the consolidation stage, which is the main crux.
I welcome any further questions or comments!
Thanks & Regards
Karuppayya
On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan
<[email protected]> wrote:
There are existing Apache projects which provide the
capabilities which largely addresses the problem statement
- Apache Celeborn, Apache Uniffle, Zeus, etc.
Doing an explicit sort stage, between "map" and "reduce"
brings with it some nice advantages, especially if the
output is durable, but that would be a nontrivial change -
and should be attempted if the benefits are being
leveraged throughout the stack (AQE, speculative
execution, etc)
Regards,
Mridul
On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan
<[email protected]> wrote:
Hi Karuppayya,
Handling large shuffles in Spark is challenging and
it's great to see proposals addressing it. I think the
extra "shuffle consolidation stage" is a good idea,
and now I feel it's better for it to be synchronous,
so that we can integrate it with AQE and leverage the
accurate runtime shuffle status to make decisions
about whether or not to launch this extra "shuffle
consolidation stage" and how to consolidate. This is a
key differentiator compared to the push-based shuffle.
However, there are many details to consider, and in
general it's difficult to use cloud storage for
shuffle. We need to deal with problems like traffic
throttling, cleanup guarantee, cost control, and so
on. Let's take a step back and see what are the actual
problems of large shuffles.
Large shuffle usually starts with a large number of
mappers that we can't adjust (e.g. large table scan).
We can adjust the number of reducers to reach two goals:
1. The input data size of each reducer shouldn't be
too large, which is roughly *total_shuffle_size /
num_reducers*. This is to avoid spilling/OOM during
reducer task execution.
2. The data size of each shuffle block shoudn't be too
small, which is roughly *total_shuffle_size /
(num_mappers * num_reducers)*. This is for the good of
disk/network IO.
These two goals are actually contradictory and
sometimes we have to prioritize goal 1 (i.e. pick a
large *num_reducers*) so that the query can finish. An
extra "shuffle consolidation stage" can kind of
decrease the number of mappers, by merging the shuffle
files from multiple mappers. This can be a clear win
as fetching many small shuffle blocks can be quite
slow, even slower than running an extra "shuffle
consolidation stage".
In addition, the nodes that host shuffle files
shouldn't be too many (best to be 0 which means
shuffle files are stored in a different storage). With
a large number of mappers, likely every node in the
cluster stores some shuffle files. By merging shuffle
files via the extra "shuffle consolidation stage", we
can decrease the number of nodes that host active
shuffle data, so that the cluster is more elastic.
Thanks,
Wenchen
On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi
<[email protected]> wrote:
Hi Karuppayya,
Thanks for sharing the proposal and this looks
very exciting.
I have a few questions and please correct me if I
misunderstood anything.
Would it be possible to clarify whether the
consolidated shuffle file produced for each
partition is suitable across all storage systems,
especially when this file becomes extremely large?
I am wondering if a very large file could create
problems during retrieval. For example, if a
connection breaks while reading the file, some
storage systems may not support resuming reads
from the point of failure and start reading the
file from the beginning again. This could lead to
higher latency, repeated retries, or performance
bottlenecks when a partition becomes too large or
skewed?
Would it make sense to introduce a configurable
upper-bound on the maximum allowed file size? This
might prevent the file from growing massively.
Should the consolidated shuffle file be compressed
before being written to the storage system.
Compression might introduce additional latency but
that too can be a configurable option.
Regards,
Rishab Joshi
On Thu, Nov 13, 2025 at 9:14 AM karuppayya
<[email protected]> wrote:
Enrico,
Thank you very much for reviewing the doc.
/Since the consolidation stage reads all
the shuffle data, why not doing the
transformation in that stage? What is the
point in deferring the transformations
into another stage?/
The reason for deferring the final
consolidation to a subsequent stage lies in
the distributed nature of shuffle data.
Reducer requires reading all corresponding
shuffle data written across all map tasks.
Since each mapper only holds its own local
output, the consolidation cannot begin until
all the map stage completes.
However, your question is also aligned to one
of the approaches mentioned (concurrent
consolidation
<https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
which was specifically considered.
While the synchronous consolidation happens
afetr all the data is available , concurrent
consolidation can aggregate and persist the
already-generated shuffle data to begin
concurrently with the remaining map tasks,
thereby making the shuffle durable much
earlier instead of having to wait for all map
tasks to complete.
- Karuppayya
On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack
<[email protected]> wrote:
Hi,
another remark regarding a remote shuffle
storage solution:
As long as the map executors are alive,
reduce executors should read from them to
avoid any extra delay / overhead.
On fetch failure from a map executor, the
reduce executors should fall back to a
remote storage that provides a copy
(merged or not) of the shuffle data.
Cheers,
Enrico
Am 13.11.25 um 09:42 schrieb Enrico Minack:
Hi Karuppayya,
thanks for your proposal and bringing up
this issue.
I am very much in favour of a shuffle
storage solution that allows for dynamic
allocation and node failure in a K8S
environment, without the burden of
managing an Remote Shuffle Service.
I have the following comments:
Your proposed consolidation stage is
equivalent to the next reducer stage in
the sense that it reads shuffle data from
the earlier map stage. This requires the
executors of the map stage to survive
until the shuffle data are consolidated
("merged" in Spark terminology).
Therefore, I think this passage of your
design document is not accurate:
Executors that perform the initial
map tasks (shuffle writers) can be
immediately deallocated after writing
their shuffle data ...
Since the consolidation stage reads all
the shuffle data, why not doing the
transformation in that stage? What is the
point in deferring the transformations
into another stage?
You mention the "Native Shuffle Block
Migration" and say its limitation is "It
simply shifts the storage burden to other
active executors".
Please consider that the migration
process can migrate to a (in Spark
called) fallback storage, which
essentially copies the shuffle data to a
remote storage.
Kind regards,
Enrico
Am 13.11.25 um 01:40 schrieb karuppayya:
Hi All,
I propose to utilize *Remote Storage as
a Shuffle Store, natively in Spark* .
This approach would fundamentally
decouple shuffle storage from compute
nodes, mitigating *shuffle fetch
failures and also help with
aggressive downscaling*.
The primary goal is to enhance the
*elasticity and resilience* of Spark
workloads, leading to substantial cost
optimization opportunities.
*I welcome any initial thoughts or
concerns regarding this idea.*
*Looking forward to your feedback! *
JIRA: SPARK-53484
<https://issues.apache.org/jira/browse/SPARK-54327>
SPIP doc
<https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>,
Design doc
<https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
PoC PR
<https://github.com/apache/spark/pull/53028>
Thanks,
Karuppayya
--
Regards
Rishab Joshi