This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 8b7b18134 [CELEBORN-1663] Only register appShuffleDeterminate if stage
using celeborn for shuffle
8b7b18134 is described below
commit 8b7b181348df4a27ead6ddf7329ecff1b3d6656e
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Oct 22 11:40:06 2024 +0800
[CELEBORN-1663] Only register appShuffleDeterminate if stage using celeborn
for shuffle
Only register appShuffleDeterminate if stage using celeborn for shuffle
Currently we are passing stage info to lifecyclemanager, eventhough it is
not required.
NA
Existing UTs
Closes #2832 from s0nskar/fix_register.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 813b45f2848a1b6e283b733d19343f33f190c793)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 1a3e5cca1..4f745b1e3 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -159,10 +159,6 @@ public class SparkShuffleManager implements ShuffleManager
{
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager();
- lifecycleManager.registerAppShuffleDeterminate(
- shuffleId,
- dependency.rdd().getOutputDeterministicLevel() !=
DeterministicLevel.INDETERMINATE());
-
if (fallbackPolicyRunner.applyAllFallbackPolicy(
lifecycleManager, dependency.partitioner().numPartitions())) {
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
@@ -178,6 +174,11 @@ public class SparkShuffleManager implements ShuffleManager
{
sortShuffleIds.add(shuffleId);
return sortShuffleManager().registerShuffle(shuffleId, dependency);
} else {
+ lifecycleManager.registerAppShuffleDeterminate(
+ shuffleId,
+ !DeterministicLevel.INDETERMINATE()
+ .equals(dependency.rdd().getOutputDeterministicLevel()));
+
return new CelebornShuffleHandle<>(
appUniqueId,
lifecycleManager.getHost(),