mridulm commented on code in PR #3008:
URL: https://github.com/apache/celeborn/pull/3008#discussion_r1894389118
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1700,17 +1699,20 @@ private void mapEndInternal(
throws IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
PushState pushState = getPushState(mapKey);
-
try {
limitZeroInFlight(mapKey, pushState);
-
MapperEndResponse response =
- lifecycleManagerRef.askSync(
- new MapperEnd(shuffleId, mapId, attemptId, numMappers,
partitionId),
- ClassTag$.MODULE$.apply(MapperEndResponse.class));
+ callLifecycleManagerWithTimeoutRetry(
+ () ->
+ lifecycleManagerRef.askSync(
+ new MapperEnd(shuffleId, mapId, attemptId, numMappers,
partitionId),
+ ClassTag$.MODULE$.apply(MapperEndResponse.class)),
+ "mapperEnd");
if (response.status() != StatusCode.SUCCESS) {
throw new CelebornIOException("MapperEnd failed! StatusCode: " +
response.status());
}
+ } catch (Exception e) {
Review Comment:
Catch relevant/specific exceptions instead of blanket `Exception`
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1926,13 +1924,58 @@ public void shutdown() {
}
@Override
- public void setupLifecycleManagerRef(String host, int port) {
+ public void setupLifecycleManagerRef(String host, int port) throws
CelebornIOException {
logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port);
- lifecycleManagerRef =
- rpcEnv.setupEndpointRef(new RpcAddress(host, port),
RpcNameConstants.LIFECYCLE_MANAGER_EP);
+ try {
+ lifecycleManagerRef =
+ callLifecycleManagerWithTimeoutRetry(
+ () ->
+ rpcEnv.setupEndpointRef(
+ new RpcAddress(host, port),
RpcNameConstants.LIFECYCLE_MANAGER_EP),
+ "setupLifecycleManagerRef");
+ } catch (Exception e) {
+ logger.error("setupLifecycleManagerRef failed, host = {}, port = {}",
host, port);
+ throw new CelebornIOException("setupLifecycleManagerRef failed", e);
+ }
initDataClientFactoryIfNeeded();
}
+ public <T> T callLifecycleManagerWithTimeoutRetry(Callable<T> callable,
String name)
Review Comment:
Instead of making changes everywhere - do we want to simply change
askSync/askAsync to become retry aware ? With number of retries passed in as a
param (for specific cases where we dont want retries for ex) ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]