This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8f6c6d646b45 [SPARK-52923][CORE] Allow ShuffleManager to control push
merge during shuffle registration
8f6c6d646b45 is described below
commit 8f6c6d646b4596877bae111858227653d42ec51d
Author: gaoyajun02 <[email protected]>
AuthorDate: Sat Nov 29 21:14:51 2025 -0600
[SPARK-52923][CORE] Allow ShuffleManager to control push merge during
shuffle registration
### What changes were proposed in this pull request?
This PR moves the `shuffleManager.registerShuffle()` call to occur after
the initialization of `_shuffleMergeAllowed` in `ShuffleDependency`.
### Why are the changes needed?
While `spark.shuffle.push.enabled` provides global control over push-based
shuffle, there are scenarios requiring more granular control:
- Mass spark application migration scenarios where different jobs may need
different shuffle strategies
- Remote shuffle manager(e.g. celeborn/uniffle) need shuffle-level fallback
capabilities to push-based shuffle
- Dynamic decision making based on shuffle characteristics during shuffle
registration
### Does this PR introduce _any_ user-facing change?
No, this is an internal refactoring that maintains backward compatibility.
The default behavior remains unchanged.
### How was this patch tested?
- Existing unit tests continue to pass
- The change only affects the order of initialization, not the logic
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #51629 from gaoyajun02/SPARK-52923.
Authored-by: gaoyajun02 <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
core/src/main/scala/org/apache/spark/Dependency.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala
b/core/src/main/scala/org/apache/spark/Dependency.scala
index c436025e06bb..8738298fed0e 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -127,15 +127,15 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C:
ClassTag](
val shuffleId: Int = _rdd.context.newShuffleId()
- val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
- shuffleId, this)
-
private[this] val numPartitions = rdd.partitions.length
// By default, shuffle merge is allowed for ShuffleDependency if push based
shuffle
// is enabled
private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()
+ val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
+ shuffleId, this)
+
private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean):
Unit = {
_shuffleMergeAllowed = shuffleMergeAllowed
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]