This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new bc18c9ae3 [CELEBORN-1479] Report register shuffle failed reason in
exception
bc18c9ae3 is described below
commit bc18c9ae392ce8a0db1fa5f79540af9163ea7b41
Author: jiang13021 <[email protected]>
AuthorDate: Tue Jul 9 17:12:23 2024 +0800
[CELEBORN-1479] Report register shuffle failed reason in exception
### What changes were proposed in this pull request?
Add `ConcurrentHashMap<Integer, Exception> registerShuffleExceptions` in
ShuffleClientImpl to record register shuffle failed reason.
### Why are the changes needed?
There could be various reasons for a Register Shuffle failure, such as
SLOT_NOT_AVAILABLE, RESERVE_SLOTS_FAILED, and so on. However, the current
exceptions only indicate that a shuffle registration has failed without
providing details on the cause of the failure. We are unable to determine the
exact reason for the failure unless we check the LifecycleManager logs.
In this PR, the actual reason for a register shuffle failure is included in
the thrown exception.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
unit test:
org.apache.celeborn.client.ShuffleClientSuiteJ#testRegisterShuffleFailed
Closes #2590 from jiang13021/register_shuffle_failed_reason.
Authored-by: jiang13021 <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/client/ShuffleClient.java | 2 +-
.../apache/celeborn/client/ShuffleClientImpl.java | 44 +++++++++------
.../celeborn/client/ShuffleClientSuiteJ.java | 65 +++++++++++++++++++++-
3 files changed, 93 insertions(+), 18 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index aef173a62..0ea484deb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -262,7 +262,7 @@ public abstract class ShuffleClient {
int shuffleId, int numMappers, int mapId, int attemptId, int
partitionId) throws IOException;
public abstract ConcurrentHashMap<Integer, PartitionLocation>
getPartitionLocation(
- int shuffleId, int numMappers, int numPartitions);
+ int shuffleId, int numMappers, int numPartitions) throws
CelebornIOException;
public abstract PushState getPushState(String mapKey);
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 1b22a7a62..e8770c537 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -514,7 +514,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
- int shuffleId, int numMappers, int numPartitions) {
+ int shuffleId, int numMappers, int numPartitions) throws
CelebornIOException {
return registerShuffleInternal(
shuffleId,
numMappers,
@@ -548,18 +548,29 @@ public class ShuffleClientImpl extends ShuffleClient {
conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
- if (partitionLocationMap == null) {
- throw new CelebornIOException("Register shuffle failed for shuffle " +
shuffleId);
- }
-
return partitionLocationMap.get(partitionId);
}
@Override
public ConcurrentHashMap<Integer, PartitionLocation> getPartitionLocation(
- int shuffleId, int numMappers, int numPartitions) {
- return reducePartitionMap.computeIfAbsent(
- shuffleId, (id) -> registerShuffle(shuffleId, numMappers,
numPartitions));
+ int shuffleId, int numMappers, int numPartitions) throws
CelebornIOException {
+ try {
+ return reducePartitionMap.computeIfAbsent(
+ shuffleId,
+ (id) -> {
+ try {
+ return registerShuffle(shuffleId, numMappers, numPartitions);
+ } catch (CelebornIOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof CelebornIOException) {
+ throw (CelebornIOException) e.getCause();
+ } else {
+ throw e;
+ }
+ }
}
@Override
@@ -606,8 +617,10 @@ public class ShuffleClientImpl extends ShuffleClient {
int shuffleId,
int numMappers,
int numPartitions,
- Callable<PbRegisterShuffleResponse> callable) {
+ Callable<PbRegisterShuffleResponse> callable)
+ throws CelebornIOException {
int numRetries = registerShuffleMaxRetries;
+ StatusCode lastFailedStatusCode = null;
while (numRetries > 0) {
try {
PbRegisterShuffleResponse response = callable.call();
@@ -626,16 +639,19 @@ public class ShuffleClientImpl extends ShuffleClient {
}
return result;
} else if (StatusCode.SLOT_NOT_AVAILABLE.equals(respStatus)) {
+ lastFailedStatusCode = respStatus;
logger.error(
"LifecycleManager request slots return {}, retry again, remain
retry times {}.",
StatusCode.SLOT_NOT_AVAILABLE,
numRetries - 1);
} else if (StatusCode.RESERVE_SLOTS_FAILED.equals(respStatus)) {
+ lastFailedStatusCode = respStatus;
logger.error(
"LifecycleManager request slots return {}, retry again, remain
retry times {}.",
StatusCode.RESERVE_SLOTS_FAILED,
numRetries - 1);
} else {
+ lastFailedStatusCode = respStatus;
logger.error(
"LifecycleManager request slots return {}, retry again, remain
retry times {}.",
StatusCode.REQUEST_FAILED,
@@ -648,7 +664,7 @@ public class ShuffleClientImpl extends ShuffleClient {
numMappers,
numPartitions,
e);
- break;
+ throw new CelebornIOException("Register shuffle failed for shuffle " +
shuffleId + ".", e);
}
try {
@@ -658,8 +674,8 @@ public class ShuffleClientImpl extends ShuffleClient {
}
numRetries--;
}
-
- return null;
+ throw new CelebornIOException(
+ "Register shuffle failed for shuffle " + shuffleId + ", reason: " +
lastFailedStatusCode);
}
protected void limitMaxInFlight(String mapKey, PushState pushState, String
hostAndPushPort)
@@ -879,10 +895,6 @@ public class ShuffleClientImpl extends ShuffleClient {
final ConcurrentHashMap<Integer, PartitionLocation> map =
getPartitionLocation(shuffleId, numMappers, numPartitions);
- if (map == null) {
- throw new CelebornIOException("Register shuffle failed for shuffle " +
shuffleId + ".");
- }
-
// get location
// If rerun or speculation task running after LifecycleManager call
stageEnd,
// register shuffle will return an empty location map, client need revive
for a new location.
diff --git
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
index c4f161b6c..e3483fa57 100644
--- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
@@ -36,6 +36,7 @@ import org.junit.Test;
import org.apache.celeborn.client.compress.Compressor;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientFactory;
@@ -167,7 +168,69 @@ public class ShuffleClientSuiteJ {
}
}
+ @Test
+ public void testRegisterShuffleFailed() throws IOException,
InterruptedException {
+ setupEnv(CompressionCodec.NONE, StatusCode.SLOT_NOT_AVAILABLE);
+ try {
+ shuffleClient.pushData(
+ TEST_SHUFFLE_ID,
+ TEST_ATTEMPT_ID,
+ TEST_ATTEMPT_ID,
+ TEST_REDUCRE_ID,
+ TEST_BUF1,
+ 0,
+ TEST_BUF1.length,
+ 1,
+ 1);
+ assert false;
+ } catch (CelebornIOException e) {
+ assert e.getMessage()
+ .contains("Register shuffle failed for shuffle 1, reason:
SLOT_NOT_AVAILABLE");
+ }
+
+ setupEnv(CompressionCodec.NONE, StatusCode.RESERVE_SLOTS_FAILED);
+ try {
+ shuffleClient.pushData(
+ TEST_SHUFFLE_ID,
+ TEST_ATTEMPT_ID,
+ TEST_ATTEMPT_ID,
+ TEST_REDUCRE_ID,
+ TEST_BUF1,
+ 0,
+ TEST_BUF1.length,
+ 1,
+ 1);
+ assert false;
+ } catch (CelebornIOException e) {
+ assert e.getMessage()
+ .contains("Register shuffle failed for shuffle 1, reason:
RESERVE_SLOTS_FAILED");
+ }
+
+ setupEnv(CompressionCodec.NONE, StatusCode.REQUEST_FAILED);
+ try {
+ shuffleClient.pushData(
+ TEST_SHUFFLE_ID,
+ TEST_ATTEMPT_ID,
+ TEST_ATTEMPT_ID,
+ TEST_REDUCRE_ID,
+ TEST_BUF1,
+ 0,
+ TEST_BUF1.length,
+ 1,
+ 1);
+ assert false;
+ } catch (CelebornIOException e) {
+ assert e.getMessage()
+ .contains("Register shuffle failed for shuffle 1, reason:
REQUEST_FAILED");
+ }
+ }
+
private CelebornConf setupEnv(CompressionCodec codec) throws IOException,
InterruptedException {
+ return setupEnv(codec, StatusCode.SUCCESS);
+ }
+
+ private CelebornConf setupEnv(CompressionCodec codec, StatusCode statusCode)
+ throws IOException, InterruptedException {
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(), codec.name());
conf.set(CelebornConf.CLIENT_PUSH_RETRY_THREADS().key(), "1");
@@ -180,7 +243,7 @@ public class ShuffleClientSuiteJ {
.thenAnswer(
t ->
RegisterShuffleResponse$.MODULE$.apply(
- StatusCode.SUCCESS, new PartitionLocation[]
{primaryLocation}));
+ statusCode, new PartitionLocation[] {primaryLocation}));
shuffleClient.setupLifecycleManagerRef(endpointRef);