This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 87b1ee20e [CELEBORN-1479] Report register shuffle failed reason in 
exception
87b1ee20e is described below

commit 87b1ee20e9a8fcca4cc81968356b28e90e338853
Author: jiang13021 <[email protected]>
AuthorDate: Tue Jul 9 17:12:23 2024 +0800

    [CELEBORN-1479] Report register shuffle failed reason in exception
    
    ### What changes were proposed in this pull request?
    Add `ConcurrentHashMap<Integer, Exception> registerShuffleExceptions` in 
ShuffleClientImpl to record register shuffle failed reason.
    
    ### Why are the changes needed?
    There could be various reasons for a Register Shuffle failure, such as 
SLOT_NOT_AVAILABLE, RESERVE_SLOTS_FAILED, and so on. However, the current 
exceptions only indicate that a shuffle registration has failed without 
providing details on the cause of the failure. We are unable to determine the 
exact reason for the failure unless we check the LifecycleManager logs.
    In this PR, the actual reason for a register shuffle failure is included in 
the thrown exception.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    unit test: 
org.apache.celeborn.client.ShuffleClientSuiteJ#testRegisterShuffleFailed
    
    Closes #2590 from jiang13021/register_shuffle_failed_reason.
    
    Authored-by: jiang13021 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit bc18c9ae392ce8a0db1fa5f79540af9163ea7b41)
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/client/ShuffleClient.java  |  2 +-
 .../apache/celeborn/client/ShuffleClientImpl.java  | 44 +++++++++------
 .../celeborn/client/ShuffleClientSuiteJ.java       | 65 +++++++++++++++++++++-
 3 files changed, 93 insertions(+), 18 deletions(-)

diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index aef173a62..0ea484deb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -262,7 +262,7 @@ public abstract class ShuffleClient {
       int shuffleId, int numMappers, int mapId, int attemptId, int 
partitionId) throws IOException;
 
   public abstract ConcurrentHashMap<Integer, PartitionLocation> 
getPartitionLocation(
-      int shuffleId, int numMappers, int numPartitions);
+      int shuffleId, int numMappers, int numPartitions) throws 
CelebornIOException;
 
   public abstract PushState getPushState(String mapKey);
 
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1b22a7a62..e8770c537 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -514,7 +514,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
-      int shuffleId, int numMappers, int numPartitions) {
+      int shuffleId, int numMappers, int numPartitions) throws 
CelebornIOException {
     return registerShuffleInternal(
         shuffleId,
         numMappers,
@@ -548,18 +548,29 @@ public class ShuffleClientImpl extends ShuffleClient {
                     conf.clientRpcRegisterShuffleAskTimeout(),
                     ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
 
-    if (partitionLocationMap == null) {
-      throw new CelebornIOException("Register shuffle failed for shuffle " + 
shuffleId);
-    }
-
     return partitionLocationMap.get(partitionId);
   }
 
   @Override
   public ConcurrentHashMap<Integer, PartitionLocation> getPartitionLocation(
-      int shuffleId, int numMappers, int numPartitions) {
-    return reducePartitionMap.computeIfAbsent(
-        shuffleId, (id) -> registerShuffle(shuffleId, numMappers, 
numPartitions));
+      int shuffleId, int numMappers, int numPartitions) throws 
CelebornIOException {
+    try {
+      return reducePartitionMap.computeIfAbsent(
+          shuffleId,
+          (id) -> {
+            try {
+              return registerShuffle(shuffleId, numMappers, numPartitions);
+            } catch (CelebornIOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    } catch (RuntimeException e) {
+      if (e.getCause() instanceof CelebornIOException) {
+        throw (CelebornIOException) e.getCause();
+      } else {
+        throw e;
+      }
+    }
   }
 
   @Override
@@ -606,8 +617,10 @@ public class ShuffleClientImpl extends ShuffleClient {
       int shuffleId,
       int numMappers,
       int numPartitions,
-      Callable<PbRegisterShuffleResponse> callable) {
+      Callable<PbRegisterShuffleResponse> callable)
+      throws CelebornIOException {
     int numRetries = registerShuffleMaxRetries;
+    StatusCode lastFailedStatusCode = null;
     while (numRetries > 0) {
       try {
         PbRegisterShuffleResponse response = callable.call();
@@ -626,16 +639,19 @@ public class ShuffleClientImpl extends ShuffleClient {
           }
           return result;
         } else if (StatusCode.SLOT_NOT_AVAILABLE.equals(respStatus)) {
+          lastFailedStatusCode = respStatus;
           logger.error(
               "LifecycleManager request slots return {}, retry again, remain 
retry times {}.",
               StatusCode.SLOT_NOT_AVAILABLE,
               numRetries - 1);
         } else if (StatusCode.RESERVE_SLOTS_FAILED.equals(respStatus)) {
+          lastFailedStatusCode = respStatus;
           logger.error(
               "LifecycleManager request slots return {}, retry again, remain 
retry times {}.",
               StatusCode.RESERVE_SLOTS_FAILED,
               numRetries - 1);
         } else {
+          lastFailedStatusCode = respStatus;
           logger.error(
               "LifecycleManager request slots return {}, retry again, remain 
retry times {}.",
               StatusCode.REQUEST_FAILED,
@@ -648,7 +664,7 @@ public class ShuffleClientImpl extends ShuffleClient {
             numMappers,
             numPartitions,
             e);
-        break;
+        throw new CelebornIOException("Register shuffle failed for shuffle " + 
shuffleId + ".", e);
       }
 
       try {
@@ -658,8 +674,8 @@ public class ShuffleClientImpl extends ShuffleClient {
       }
       numRetries--;
     }
-
-    return null;
+    throw new CelebornIOException(
+        "Register shuffle failed for shuffle " + shuffleId + ", reason: " + 
lastFailedStatusCode);
   }
 
   protected void limitMaxInFlight(String mapKey, PushState pushState, String 
hostAndPushPort)
@@ -879,10 +895,6 @@ public class ShuffleClientImpl extends ShuffleClient {
     final ConcurrentHashMap<Integer, PartitionLocation> map =
         getPartitionLocation(shuffleId, numMappers, numPartitions);
 
-    if (map == null) {
-      throw new CelebornIOException("Register shuffle failed for shuffle " + 
shuffleId + ".");
-    }
-
     // get location
     // If rerun or speculation task running after LifecycleManager call 
stageEnd,
     // register shuffle will return an empty location map, client need revive 
for a new location.
diff --git 
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java 
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
index c4f161b6c..e3483fa57 100644
--- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 
 import org.apache.celeborn.client.compress.Compressor;
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.network.client.TransportClient;
 import org.apache.celeborn.common.network.client.TransportClientFactory;
@@ -167,7 +168,69 @@ public class ShuffleClientSuiteJ {
     }
   }
 
+  @Test
+  public void testRegisterShuffleFailed() throws IOException, 
InterruptedException {
+    setupEnv(CompressionCodec.NONE, StatusCode.SLOT_NOT_AVAILABLE);
+    try {
+      shuffleClient.pushData(
+          TEST_SHUFFLE_ID,
+          TEST_ATTEMPT_ID,
+          TEST_ATTEMPT_ID,
+          TEST_REDUCRE_ID,
+          TEST_BUF1,
+          0,
+          TEST_BUF1.length,
+          1,
+          1);
+      assert false;
+    } catch (CelebornIOException e) {
+      assert e.getMessage()
+          .contains("Register shuffle failed for shuffle 1, reason: 
SLOT_NOT_AVAILABLE");
+    }
+
+    setupEnv(CompressionCodec.NONE, StatusCode.RESERVE_SLOTS_FAILED);
+    try {
+      shuffleClient.pushData(
+          TEST_SHUFFLE_ID,
+          TEST_ATTEMPT_ID,
+          TEST_ATTEMPT_ID,
+          TEST_REDUCRE_ID,
+          TEST_BUF1,
+          0,
+          TEST_BUF1.length,
+          1,
+          1);
+      assert false;
+    } catch (CelebornIOException e) {
+      assert e.getMessage()
+          .contains("Register shuffle failed for shuffle 1, reason: 
RESERVE_SLOTS_FAILED");
+    }
+
+    setupEnv(CompressionCodec.NONE, StatusCode.REQUEST_FAILED);
+    try {
+      shuffleClient.pushData(
+          TEST_SHUFFLE_ID,
+          TEST_ATTEMPT_ID,
+          TEST_ATTEMPT_ID,
+          TEST_REDUCRE_ID,
+          TEST_BUF1,
+          0,
+          TEST_BUF1.length,
+          1,
+          1);
+      assert false;
+    } catch (CelebornIOException e) {
+      assert e.getMessage()
+          .contains("Register shuffle failed for shuffle 1, reason: 
REQUEST_FAILED");
+    }
+  }
+
   private CelebornConf setupEnv(CompressionCodec codec) throws IOException, 
InterruptedException {
+    return setupEnv(codec, StatusCode.SUCCESS);
+  }
+
+  private CelebornConf setupEnv(CompressionCodec codec, StatusCode statusCode)
+      throws IOException, InterruptedException {
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), codec.name());
     conf.set(CelebornConf.CLIENT_PUSH_RETRY_THREADS().key(), "1");
@@ -180,7 +243,7 @@ public class ShuffleClientSuiteJ {
         .thenAnswer(
             t ->
                 RegisterShuffleResponse$.MODULE$.apply(
-                    StatusCode.SUCCESS, new PartitionLocation[] 
{primaryLocation}));
+                    statusCode, new PartitionLocation[] {primaryLocation}));
 
     shuffleClient.setupLifecycleManagerRef(endpointRef);
 

Reply via email to