This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 8b7b18134 [CELEBORN-1663] Only register appShuffleDeterminate if stage 
using celeborn for shuffle
8b7b18134 is described below

commit 8b7b181348df4a27ead6ddf7329ecff1b3d6656e
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Oct 22 11:40:06 2024 +0800

    [CELEBORN-1663] Only register appShuffleDeterminate if stage using celeborn 
for shuffle
    
    Only register appShuffleDeterminate if stage using celeborn for shuffle
    
    Currently we are passing stage info to lifecyclemanager, eventhough it is 
not required.
    
    NA
    
    Existing UTs
    
    Closes #2832 from s0nskar/fix_register.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 813b45f2848a1b6e283b733d19343f33f190c793)
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/spark/shuffle/celeborn/SparkShuffleManager.java   | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 1a3e5cca1..4f745b1e3 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -159,10 +159,6 @@ public class SparkShuffleManager implements ShuffleManager 
{
     appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
     initializeLifecycleManager();
 
-    lifecycleManager.registerAppShuffleDeterminate(
-        shuffleId,
-        dependency.rdd().getOutputDeterministicLevel() != 
DeterministicLevel.INDETERMINATE());
-
     if (fallbackPolicyRunner.applyAllFallbackPolicy(
         lifecycleManager, dependency.partitioner().numPartitions())) {
       if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
@@ -178,6 +174,11 @@ public class SparkShuffleManager implements ShuffleManager 
{
       sortShuffleIds.add(shuffleId);
       return sortShuffleManager().registerShuffle(shuffleId, dependency);
     } else {
+      lifecycleManager.registerAppShuffleDeterminate(
+          shuffleId,
+          !DeterministicLevel.INDETERMINATE()
+              .equals(dependency.rdd().getOutputDeterministicLevel()));
+
       return new CelebornShuffleHandle<>(
           appUniqueId,
           lifecycleManager.getHost(),

Reply via email to