This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 610f40b39 [CELEBORN-1855] LifecycleManager return appshuffleId for non 
barrier stage when fetch fail has been reported
610f40b39 is described below

commit 610f40b392a2d64518cde713e3b31f296ec7baf0
Author: lijianfu03 <[email protected]>
AuthorDate: Tue May 13 16:14:03 2025 +0800

    [CELEBORN-1855] LifecycleManager return appshuffleId for non barrier stage 
when fetch fail has been reported
    
    for non barrier shuffle read stage, 
LifecycleManager#handleGetShuffleIdForApp always return appshuffleId whether 
fetch status is true or not.
    
    As described in 
[jira](https://issues.apache.org/jira/browse/CELEBORN-1855), If 
LifecycleManager only returns appshuffleId whose fetch status is success, the 
task will fail directly to "there is no finished map stage associated with", 
but previous fetch fail event reported may not be fatal.So just give it a chance
    
    (cherry picked from commit 045411ac34c749d8ca2d7bed8d0b9c3554f71287)
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/spark/shuffle/celeborn/SparkUtils.java  | 17 ++++++++++++-----
 .../shuffle/celeborn/CelebornShuffleReader.scala   | 22 +++++++++++++++++++---
 .../apache/spark/shuffle/celeborn/SparkUtils.java  | 17 ++++++++++++-----
 .../shuffle/celeborn/CelebornShuffleReader.scala   | 22 ++++++++++++++++++++--
 .../apache/celeborn/client/DummyShuffleClient.java |  6 ++++--
 .../org/apache/celeborn/client/ShuffleClient.java  |  4 +++-
 .../apache/celeborn/client/ShuffleClientImpl.java  |  7 ++++---
 .../apache/celeborn/client/LifecycleManager.scala  | 18 ++++++++++++------
 common/src/main/proto/TransportMessages.proto      |  1 +
 9 files changed, 87 insertions(+), 27 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 4f38c9815..6cc4dc5b8 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
 import org.apache.celeborn.common.util.Utils;
 
 public class SparkUtils {
@@ -131,11 +132,17 @@ public class SparkUtils {
       Boolean isWriter) {
     if (handle.throwsFetchFailure()) {
       String appShuffleIdentifier = 
getAppShuffleIdentifier(handle.shuffleId(), context);
-      return client.getShuffleId(
-          handle.shuffleId(),
-          appShuffleIdentifier,
-          isWriter,
-          context instanceof BarrierTaskContext);
+      Tuple2<Integer, Boolean> res =
+          client.getShuffleId(
+              handle.shuffleId(),
+              appShuffleIdentifier,
+              isWriter,
+              context instanceof BarrierTaskContext);
+      if (!res._2) {
+        throw new CelebornRuntimeException(String.format("Get invalid shuffle 
id %s", res._1));
+      } else {
+        return res._1;
+      }
     } else {
       return handle.shuffleId();
     }
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 4a3092275..890971b45 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -32,7 +32,7 @@ import org.apache.celeborn.client.ShuffleClient
 import org.apache.celeborn.client.read.CelebornInputStream
 import org.apache.celeborn.client.read.MetricsCallback
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException, 
PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornIOException, 
CelebornRuntimeException, PartitionUnRetryAbleException}
 import org.apache.celeborn.common.util.ThreadUtils
 
 class CelebornShuffleReader[K, C](
@@ -61,8 +61,24 @@ class CelebornShuffleReader[K, C](
   override def read(): Iterator[Product2[K, C]] = {
 
     val serializerInstance = dep.serializer.newInstance()
-
-    val shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle, 
context, false)
+    val shuffleId =
+      try {
+        SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)
+      } catch {
+        case e: CelebornRuntimeException =>
+          logError(s"Failed to get shuffleId for appShuffleId 
${handle.shuffleId}", e)
+          if (handle.throwsFetchFailure) {
+            throw new FetchFailedException(
+              null,
+              handle.shuffleId,
+              -1,
+              startPartition,
+              SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId,
+              e)
+          } else {
+            throw e
+          }
+      }
     shuffleIdTracker.track(handle.shuffleId, shuffleId)
     logDebug(
       s"get shuffleId $shuffleId for appShuffleId ${handle.shuffleId} 
attemptNum ${context.stageAttemptNumber()}")
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 47317474e..638d14149 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
 import org.apache.celeborn.reflect.DynConstructors;
 import org.apache.celeborn.reflect.DynFields;
 import org.apache.celeborn.reflect.DynMethods;
@@ -108,11 +109,17 @@ public class SparkUtils {
       Boolean isWriter) {
     if (handle.throwsFetchFailure()) {
       String appShuffleIdentifier = 
getAppShuffleIdentifier(handle.shuffleId(), context);
-      return client.getShuffleId(
-          handle.shuffleId(),
-          appShuffleIdentifier,
-          isWriter,
-          context instanceof BarrierTaskContext);
+      Tuple2<Integer, Boolean> res =
+          client.getShuffleId(
+              handle.shuffleId(),
+              appShuffleIdentifier,
+              isWriter,
+              context instanceof BarrierTaskContext);
+      if (!res._2) {
+        throw new CelebornRuntimeException(String.format("Get invalid shuffle 
id %s", res._1));
+      } else {
+        return res._1;
+      }
     } else {
       return handle.shuffleId();
     }
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 99a5bb0fc..0433401ca 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -39,7 +39,7 @@ import org.apache.celeborn.client.{DummyShuffleClient, 
ShuffleClient}
 import org.apache.celeborn.client.ShuffleClientImpl.ReduceFileGroups
 import org.apache.celeborn.client.read.{CelebornInputStream, MetricsCallback}
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.{CelebornIOException, 
PartitionUnRetryAbleException}
+import org.apache.celeborn.common.exception.{CelebornIOException, 
CelebornRuntimeException, PartitionUnRetryAbleException}
 import org.apache.celeborn.common.network.client.TransportClient
 import org.apache.celeborn.common.network.protocol.TransportMessage
 import org.apache.celeborn.common.protocol._
@@ -77,7 +77,25 @@ class CelebornShuffleReader[K, C](
 
     val startTime = System.currentTimeMillis()
     val serializerInstance = newSerializerInstance(dep)
-    val shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle, 
context, false)
+    val shuffleId =
+      try {
+        SparkUtils.celebornShuffleId(shuffleClient, handle, context, false)
+      } catch {
+        case e: CelebornRuntimeException =>
+          logError(s"Failed to get shuffleId for appShuffleId 
${handle.shuffleId}", e)
+          if (throwsFetchFailure) {
+            throw new FetchFailedException(
+              null,
+              handle.shuffleId,
+              -1,
+              -1,
+              startPartition,
+              SparkUtils.FETCH_FAILURE_ERROR_MSG + handle.shuffleId + "/" + 
handle.shuffleId,
+              e)
+          } else {
+            throw e
+          }
+      }
     shuffleIdTracker.track(handle.shuffleId, shuffleId)
     logDebug(
       s"get shuffleId $shuffleId for appShuffleId ${handle.shuffleId} 
attemptNum ${context.stageAttemptNumber()}")
diff --git 
a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
index e2f60aee9..5ecc3a5fe 100644
--- a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -31,6 +31,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import scala.Tuple2;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -176,9 +178,9 @@ public class DummyShuffleClient extends ShuffleClient {
   }
 
   @Override
-  public int getShuffleId(
+  public Tuple2<Integer, Boolean> getShuffleId(
       int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean 
isBarrierStage) {
-    return appShuffleId;
+    return Tuple2.apply(appShuffleId, true);
   }
 
   @Override
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index b4f96a0e0..8154ca57b 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.LongAdder;
 
+import scala.Tuple2;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -266,7 +268,7 @@ public abstract class ShuffleClient {
 
   public abstract PushState getPushState(String mapKey);
 
-  public abstract int getShuffleId(
+  public abstract Tuple2<Integer, Boolean> getShuffleId(
       int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean 
isBarrierStage);
 
   /**
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b22cce93d..43f658988 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -100,7 +100,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   protected byte[] extension;
 
   // key: appShuffleIdentifier, value: shuffleId
-  protected Map<String, Integer> shuffleIdCache = 
JavaUtils.newConcurrentHashMap();
+  protected Map<String, Tuple2<Integer, Boolean>> shuffleIdCache = 
JavaUtils.newConcurrentHashMap();
 
   // key: shuffleId, value: (partitionId, PartitionLocation)
   final Map<Integer, ConcurrentHashMap<Integer, PartitionLocation>> 
reducePartitionMap =
@@ -591,7 +591,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   @Override
-  public int getShuffleId(
+  public Tuple2<Integer, Boolean> getShuffleId(
       int appShuffleId, String appShuffleIdentifier, boolean isWriter, boolean 
isBarrierStage) {
     return shuffleIdCache.computeIfAbsent(
         appShuffleIdentifier,
@@ -608,7 +608,8 @@ public class ShuffleClientImpl extends ShuffleClient {
                   pbGetShuffleId,
                   conf.clientRpcRegisterShuffleAskTimeout(),
                   ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
-          return pbGetShuffleIdResponse.getShuffleId();
+          return Tuple2.apply(
+              pbGetShuffleIdResponse.getShuffleId(), 
pbGetShuffleIdResponse.getSuccess());
         });
   }
 
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 9630bbba0..814fbe922 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -827,7 +827,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     if (shuffleIds == null) {
       logWarning(s"unknown appShuffleId $appShuffleId, maybe no shuffle data 
for this shuffle")
       val pbGetShuffleIdResponse =
-        
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).build()
+        
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).setSuccess(
+          true).build()
       context.reply(pbGetShuffleIdResponse)
       return
     }
@@ -841,7 +842,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         shuffleIds.get(appShuffleIdentifier) match {
           case Some((shuffleId, _)) =>
             val pbGetShuffleIdResponse =
-              
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
+              
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).setSuccess(true).build()
             context.reply(pbGetShuffleIdResponse)
           case None =>
             Option(appShuffleDeterminateMap.get(appShuffleId)).map { 
determinate =>
@@ -874,7 +875,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
                   newShuffleId
                 }
               val pbGetShuffleIdResponse =
-                
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
+                
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).setSuccess(true).build()
               context.reply(pbGetShuffleIdResponse)
             }.orElse(
               throw new UnsupportedOperationException(
@@ -887,12 +888,17 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
             val pbGetShuffleIdResponse = {
               logDebug(
                 s"get shuffleId $shuffleId for appShuffleId $appShuffleId 
appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
-              
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
+              
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).setSuccess(true).build()
             }
             context.reply(pbGetShuffleIdResponse)
           case None =>
-            throw new UnsupportedOperationException(
-              s"unexpected! there is no finished map stage associated with 
appShuffleId $appShuffleId")
+            val pbGetShuffleIdResponse = {
+              logInfo(
+                s"there is no finished map stage associated with appShuffleId 
$appShuffleId")
+              
PbGetShuffleIdResponse.newBuilder().setShuffleId(UNKNOWN_APP_SHUFFLE_ID).setSuccess(
+                false).build()
+            }
+            context.reply(pbGetShuffleIdResponse)
         }
       }
     }
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 90040663f..72f16314e 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -375,6 +375,7 @@ message PbGetShuffleId {
 
 message PbGetShuffleIdResponse {
   int32 shuffleId = 1;
+  bool success = 2;
 }
 
 message PbReportShuffleFetchFailure {

Reply via email to