This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fc459c0f7 [CELEBORN-1757] Add retry when sending RPC to 
LifecycleManager
fc459c0f7 is described below

commit fc459c0f7da4f0d5e307a8b29bba776509f21bae
Author: zhengtao <[email protected]>
AuthorDate: Mon Feb 17 11:27:02 2025 -0800

    [CELEBORN-1757] Add retry when sending RPC to LifecycleManager
    
    ### What changes were proposed in this pull request?
    Retry seding RPC to LifecycleManager when TimeoutException.
    
    ### Why are the changes needed?
    RPC messages are processed by `Dispatcher.threadpool` which its numThreads 
depends on `numUsableCores`.
    In some cases (k8s) the numThreads of LifecycleManager are not enough while 
the RPCs are a lot so there are TimeoutExceptions.
    Add retry when there are TimeoutExceptions.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    Another way is to adjust the configuration 
`celeborn.lifecycleManager.rpc.dispatcher.threads` to add the numThreads.
    This way is more affective.
    
    ### How was this patch tested?
    Cluster testing.
    
    Closes #3008 from zaynt4606/clb1757.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../apache/celeborn/client/ShuffleClientImpl.java  | 138 ++++++++++++---------
 .../celeborn/client/ShuffleClientSuiteJ.java       |  41 ++++++
 .../org/apache/celeborn/common/CelebornConf.scala  |  20 ++-
 .../celeborn/common/rpc/RpcEndpointRef.scala       |  67 ++++++++++
 .../org/apache/celeborn/common/rpc/RpcEnv.scala    |  39 ++++++
 docs/configuration/client.md                       |   3 +-
 docs/configuration/network.md                      |   1 +
 7 files changed, 245 insertions(+), 64 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 106d77c21..fe373c8d2 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -44,6 +44,7 @@ import org.apache.celeborn.client.read.CelebornInputStream;
 import org.apache.celeborn.client.read.MetricsCallback;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.exception.CelebornIOException;
+import org.apache.celeborn.common.exception.CelebornRuntimeException;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.metrics.source.Role;
 import org.apache.celeborn.common.network.TransportContext;
@@ -83,6 +84,8 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   private final int registerShuffleMaxRetries;
   private final long registerShuffleRetryWaitMs;
+  private final int rpcMaxRetries;
+  private final long rpcRetryWait;
   private final int maxReviveTimes;
   private final boolean testRetryRevive;
   private final int pushBufferMaxSize;
@@ -181,6 +184,8 @@ public class ShuffleClientImpl extends ShuffleClient {
     this.userIdentifier = userIdentifier;
     registerShuffleMaxRetries = conf.clientRegisterShuffleMaxRetry();
     registerShuffleRetryWaitMs = conf.clientRegisterShuffleRetryWaitMs();
+    rpcMaxRetries = conf.clientRpcMaxRetries();
+    rpcRetryWait = conf.clientRpcRetryWait();
     maxReviveTimes = conf.clientPushMaxReviveTimes();
     testRetryRevive = conf.testRetryRevive();
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
@@ -537,6 +542,8 @@ public class ShuffleClientImpl extends ShuffleClient {
             lifecycleManagerRef.askSync(
                 RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, 
numPartitions),
                 conf.clientRpcRegisterShuffleAskTimeout(),
+                rpcMaxRetries,
+                rpcRetryWait,
                 ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
   }
 
@@ -1711,6 +1718,8 @@ public class ShuffleClientImpl extends ShuffleClient {
       MapperEndResponse response =
           lifecycleManagerRef.askSync(
               new MapperEnd(shuffleId, mapId, attemptId, numMappers, 
partitionId),
+              rpcMaxRetries,
+              rpcRetryWait,
               ClassTag$.MODULE$.apply(MapperEndResponse.class));
       if (response.status() != StatusCode.SUCCESS) {
         throw new CelebornIOException("MapperEnd failed! StatusCode: " + 
response.status());
@@ -1745,69 +1754,65 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   protected Tuple3<ReduceFileGroups, String, Exception> loadFileGroupInternal(
       int shuffleId, boolean isSegmentGranularityVisible) {
-    {
-      long getReducerFileGroupStartTime = System.nanoTime();
-      String exceptionMsg = null;
-      Exception exception = null;
-      try {
-        if (lifecycleManagerRef == null) {
-          exceptionMsg = "Driver endpoint is null!";
-          logger.warn(exceptionMsg);
-        } else {
-          GetReducerFileGroup getReducerFileGroup =
-              new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible);
-
-          GetReducerFileGroupResponse response =
-              lifecycleManagerRef.askSync(
-                  getReducerFileGroup,
-                  conf.clientRpcGetReducerFileGroupAskTimeout(),
-                  ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
+    long getReducerFileGroupStartTime = System.nanoTime();
+    String exceptionMsg = null;
+    Exception exception = null;
+    if (lifecycleManagerRef == null) {
+      exceptionMsg = "Driver endpoint is null!";
+      logger.warn(exceptionMsg);
+      return Tuple3.apply(null, exceptionMsg, exception);
+    }
+    try {
+      GetReducerFileGroup getReducerFileGroup =
+          new GetReducerFileGroup(shuffleId, isSegmentGranularityVisible);
 
-          switch (response.status()) {
-            case SUCCESS:
-              logger.info(
-                  "Shuffle {} request reducer file group success using {} ms, 
result partition size {}.",
-                  shuffleId,
-                  TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
getReducerFileGroupStartTime),
-                  response.fileGroup().size());
-              return Tuple3.apply(
-                  new ReduceFileGroups(
-                      response.fileGroup(), response.attempts(), 
response.partitionIds()),
-                  null,
-                  null);
-            case SHUFFLE_NOT_REGISTERED:
-              logger.warn(
-                  "Request {} return {} for {}.",
-                  getReducerFileGroup,
-                  response.status(),
-                  shuffleId);
-              // return empty result
-              return Tuple3.apply(
-                  new ReduceFileGroups(
-                      response.fileGroup(), response.attempts(), 
response.partitionIds()),
-                  null,
-                  null);
-            case STAGE_END_TIME_OUT:
-            case SHUFFLE_DATA_LOST:
-              exceptionMsg =
-                  String.format(
-                      "Request %s return %s for %s.",
-                      getReducerFileGroup, response.status(), shuffleId);
-              logger.warn(exceptionMsg);
-              break;
-            default: // fall out
-          }
-        }
-      } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        logger.error("Exception raised while call GetReducerFileGroup for 
{}.", shuffleId, e);
-        exceptionMsg = e.getMessage();
-        exception = e;
+      GetReducerFileGroupResponse response =
+          lifecycleManagerRef.askSync(
+              getReducerFileGroup,
+              conf.clientRpcGetReducerFileGroupAskTimeout(),
+              rpcMaxRetries,
+              rpcRetryWait,
+              ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
+      switch (response.status()) {
+        case SUCCESS:
+          logger.info(
+              "Shuffle {} request reducer file group success using {} ms, 
result partition size {}.",
+              shuffleId,
+              TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
getReducerFileGroupStartTime),
+              response.fileGroup().size());
+          return Tuple3.apply(
+              new ReduceFileGroups(
+                  response.fileGroup(), response.attempts(), 
response.partitionIds()),
+              null,
+              null);
+        case SHUFFLE_NOT_REGISTERED:
+          logger.warn(
+              "Request {} return {} for {}.", getReducerFileGroup, 
response.status(), shuffleId);
+          // return empty result
+          return Tuple3.apply(
+              new ReduceFileGroups(
+                  response.fileGroup(), response.attempts(), 
response.partitionIds()),
+              null,
+              null);
+        case STAGE_END_TIME_OUT:
+        case SHUFFLE_DATA_LOST:
+          exceptionMsg =
+              String.format(
+                  "Request %s return %s for %s.",
+                  getReducerFileGroup, response.status(), shuffleId);
+          logger.warn(exceptionMsg);
+          break;
+        default: // fall out
       }
-      return Tuple3.apply(null, exceptionMsg, exception);
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      logger.error("Exception raised while call GetReducerFileGroup for {}.", 
shuffleId, e);
+      exceptionMsg = e.getMessage();
+      exception = e;
     }
+    return Tuple3.apply(null, exceptionMsg, exception);
   }
 
   @Override
@@ -1939,8 +1944,17 @@ public class ShuffleClientImpl extends ShuffleClient {
   @Override
   public void setupLifecycleManagerRef(String host, int port) {
     logger.info("setupLifecycleManagerRef: host = {}, port = {}", host, port);
-    lifecycleManagerRef =
-        rpcEnv.setupEndpointRef(new RpcAddress(host, port), 
RpcNameConstants.LIFECYCLE_MANAGER_EP);
+    try {
+      lifecycleManagerRef =
+          rpcEnv.setupEndpointRef(
+              new RpcAddress(host, port),
+              RpcNameConstants.LIFECYCLE_MANAGER_EP,
+              rpcMaxRetries,
+              rpcRetryWait);
+    } catch (Exception e) {
+      throw new CelebornRuntimeException("setupLifecycleManagerRef failed!", 
e);
+    }
+
     initDataClientFactoryIfNeeded();
   }
 
diff --git 
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java 
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
index ba0a7c39b..886aff3dc 100644
--- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
@@ -258,6 +258,12 @@ public class ShuffleClientSuiteJ {
                 RegisterShuffleResponse$.MODULE$.apply(
                     statusCode, new PartitionLocation[] {primaryLocation}));
 
+    when(endpointRef.askSync(any(), any(), any(Integer.class), 
any(Long.class), any()))
+        .thenAnswer(
+            t ->
+                RegisterShuffleResponse$.MODULE$.apply(
+                    statusCode, new PartitionLocation[] {primaryLocation}));
+
     shuffleClient.setupLifecycleManagerRef(endpointRef);
 
     ChannelFuture mockedFuture =
@@ -420,6 +426,14 @@ public class ShuffleClientSuiteJ {
                   StatusCode.SUCCESS, locations, new int[0], 
Collections.emptySet());
             });
 
+    when(endpointRef.askSync(any(), any(), any(Integer.class), 
any(Long.class), any()))
+        .thenAnswer(
+            t -> {
+              Thread.sleep(60 * 1000);
+              return GetReducerFileGroupResponse$.MODULE$.apply(
+                  StatusCode.SUCCESS, locations, new int[0], 
Collections.emptySet());
+            });
+
     shuffleClient =
         new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"));
     shuffleClient.setupLifecycleManagerRef(endpointRef);
@@ -459,6 +473,13 @@ public class ShuffleClientSuiteJ {
                   StatusCode.SHUFFLE_NOT_REGISTERED, locations, new int[0], 
Collections.emptySet());
             });
 
+    when(endpointRef.askSync(any(), any(), any(Integer.class), 
any(Long.class), any()))
+        .thenAnswer(
+            t -> {
+              return GetReducerFileGroupResponse$.MODULE$.apply(
+                  StatusCode.SHUFFLE_NOT_REGISTERED, locations, new int[0], 
Collections.emptySet());
+            });
+
     shuffleClient =
         new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"));
     shuffleClient.setupLifecycleManagerRef(endpointRef);
@@ -476,6 +497,13 @@ public class ShuffleClientSuiteJ {
                   StatusCode.STAGE_END_TIME_OUT, locations, new int[0], 
Collections.emptySet());
             });
 
+    when(endpointRef.askSync(any(), any(), any(Integer.class), 
any(Long.class), any()))
+        .thenAnswer(
+            t -> {
+              return GetReducerFileGroupResponse$.MODULE$.apply(
+                  StatusCode.STAGE_END_TIME_OUT, locations, new int[0], 
Collections.emptySet());
+            });
+
     shuffleClient =
         new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"));
     shuffleClient.setupLifecycleManagerRef(endpointRef);
@@ -493,6 +521,13 @@ public class ShuffleClientSuiteJ {
                   StatusCode.SHUFFLE_DATA_LOST, locations, new int[0], 
Collections.emptySet());
             });
 
+    when(endpointRef.askSync(any(), any(), any(Integer.class), 
any(Long.class), any()))
+        .thenAnswer(
+            t -> {
+              return GetReducerFileGroupResponse$.MODULE$.apply(
+                  StatusCode.SHUFFLE_DATA_LOST, locations, new int[0], 
Collections.emptySet());
+            });
+
     shuffleClient =
         new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"));
     shuffleClient.setupLifecycleManagerRef(endpointRef);
@@ -515,6 +550,12 @@ public class ShuffleClientSuiteJ {
               throw new RpcTimeoutException(
                   "Rpc timeout", new TimeoutException("ask sync timeout"));
             });
+    when(endpointRef.askSync(any(), any(), any(Integer.class), 
any(Long.class), any()))
+        .thenAnswer(
+            invocation -> {
+              throw new RpcTimeoutException(
+                  "Rpc timeout", new TimeoutException("ask sync timeout"));
+            });
 
     shuffleClient =
         new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"));
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 2359c57a2..ad9b35381 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -520,6 +520,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
   def rpcAskTimeout: RpcTimeout =
     new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
+  def rpcRetryWait: Long = get(RPC_RETRY_WAIT)
   def rpcInMemoryBoundedInboxCapacity(): Int = {
     get(RPC_INBOX_CAPACITY)
   }
@@ -1017,6 +1018,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientRpcCacheExpireTime: Long = get(CLIENT_RPC_CACHE_EXPIRE_TIME)
   def clientRpcSharedThreads: Int = get(CLIENT_RPC_SHARED_THREADS)
   def clientRpcMaxRetries: Int = get(CLIENT_RPC_MAX_RETIRES)
+  def clientRpcRetryWait: Long = get(CLIENT_RPC_RETRY_WAIT)
   def pushDataTimeoutMs: Long = get(CLIENT_PUSH_DATA_TIMEOUT)
   def clientPushLimitStrategy: String = get(CLIENT_PUSH_LIMIT_STRATEGY)
   def clientPushSlowStartInitialSleepTime: Long = 
get(CLIENT_PUSH_SLOW_START_INITIAL_SLEEP_TIME)
@@ -1887,6 +1889,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("60s")
 
+  val RPC_RETRY_WAIT: ConfigEntry[Long] =
+    buildConf("celeborn.rpc.retryWait")
+      .categories("network")
+      .version("0.6.0")
+      .doc("Time to wait before next retry on RpcTimeoutException.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1s")
+
   val RPC_DISPATCHER_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.rpc.dispatcher.threads")
       .withAlternative("celeborn.rpc.dispatcher.numThreads")
@@ -4938,6 +4948,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("3s")
 
+  val CLIENT_RPC_RETRY_WAIT: ConfigEntry[Long] =
+    buildConf("celeborn.client.rpc.retryWait")
+      .categories("client")
+      .version("0.6.0")
+      .doc("Client-specified time to wait before next retry on 
RpcTimeoutException.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1s")
+
   val CLIENT_RESERVE_SLOTS_MAX_RETRIES: ConfigEntry[Int] =
     buildConf("celeborn.client.reserveSlots.maxRetries")
       .withAlternative("celeborn.slots.reserve.maxRetries")
@@ -5087,7 +5105,7 @@ object CelebornConf extends Logging {
     buildConf("celeborn.client.rpc.maxRetries")
       .categories("client")
       .version("0.3.2")
-      .doc("Max RPC retry times in LifecycleManager.")
+      .doc("Max RPC retry times in client.")
       .intConf
       .createWithDefault(3)
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
index edd7005e2..8c861cf57 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpointRef.scala
@@ -17,6 +17,9 @@
 
 package org.apache.celeborn.common.rpc
 
+import java.util.Random
+import java.util.concurrent.TimeUnit
+
 import scala.concurrent.Future
 import scala.reflect.ClassTag
 
@@ -30,6 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf)
   extends Serializable with Logging {
 
   private[this] val defaultAskTimeout = conf.rpcAskTimeout
+  private[this] val defaultRetryWait = conf.rpcRetryWait
 
   /**
    * return the address for the [[RpcEndpointRef]]
@@ -88,4 +92,67 @@ abstract class RpcEndpointRef(conf: CelebornConf)
     val future = ask[T](message, timeout)
     timeout.awaitResult(future, address)
   }
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
+   * default timeout, retry if timeout, throw an exception if this still fails.
+   *
+   * Note: this is a blocking action which may cost a lot of time,  so don't 
call it in a message
+   * loop of [[RpcEndpoint]].
+   *
+   * @param message the message to send
+   * @param retryCount the number of retries for the timeout
+   * @param retryWait the waiting time for a retry
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askSync[T: ClassTag](message: Any, retryCount: Int, retryWait: Long = 
defaultRetryWait): T =
+    askSync(message, defaultAskTimeout, retryCount, retryWait)
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and 
get its result within a
+   * specified timeout, retry if timeout, throw an exception if this still 
fails.
+   *
+   * Note: this is a blocking action which may cost a lot of time, so don't 
call it in a message
+   * loop of [[RpcEndpoint]].
+   *
+   * @param message the message to send
+   * @param timeout the timeout duration
+   * @param retryCount the number of retries for the timeout
+   * @param retryWait the waiting time for a retry
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askSync[T: ClassTag](
+      message: Any,
+      timeout: RpcTimeout,
+      retryCount: Int,
+      retryWait: Long): T = {
+    var numRetries = retryCount
+    while (numRetries > 0) {
+      numRetries -= 1
+      try {
+        val future = ask[T](message, timeout)
+        return timeout.awaitResult(future, address)
+      } catch {
+        case e: RpcTimeoutException =>
+          if (numRetries > 0) {
+            val random = new Random
+            val retryWaitMs = random.nextInt(retryWait.toInt)
+            try {
+              TimeUnit.MILLISECONDS.sleep(retryWaitMs)
+            } catch {
+              case _: InterruptedException =>
+                throw e
+            }
+          } else {
+            throw e
+          }
+        case e: Exception =>
+          throw e
+      }
+    }
+    // should never be here
+    null.asInstanceOf[T]
+  }
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 89973a936..19f522a0a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -18,6 +18,8 @@
 package org.apache.celeborn.common.rpc
 
 import java.io.File
+import java.util.Random
+import java.util.concurrent.TimeUnit
 
 import scala.concurrent.Future
 
@@ -109,6 +111,7 @@ object RpcEnv {
 abstract class RpcEnv(config: RpcEnvConfig) {
 
   private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout
+  private[celeborn] val defaultRetryWait = config.conf.rpcRetryWait
 
   /**
    * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to 
implement
@@ -147,6 +150,42 @@ abstract class RpcEnv(config: RpcEnvConfig) {
     setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName))
   }
 
+  /**
+   * Retrieve the [[RpcEndpointRef]] represented by `address` and 
`endpointName` with timeout retry.
+   * This is a blocking action.
+   */
+  def setupEndpointRef(
+      address: RpcAddress,
+      endpointName: String,
+      retryCount: Int,
+      retryWait: Long = defaultRetryWait): RpcEndpointRef = {
+    var numRetries = retryCount
+    while (numRetries > 0) {
+      numRetries -= 1
+      try {
+        return setupEndpointRefByAddr(RpcEndpointAddress(address, 
endpointName))
+      } catch {
+        case e: RpcTimeoutException =>
+          if (numRetries > 0) {
+            val random = new Random
+            val retryWaitMs = random.nextInt(retryWait.toInt)
+            try {
+              TimeUnit.MILLISECONDS.sleep(retryWaitMs)
+            } catch {
+              case _: InterruptedException =>
+                throw e
+            }
+          } else {
+            throw e
+          }
+        case e: Exception =>
+          throw e
+      }
+    }
+    // should never be here
+    null
+  }
+
   /**
    * Stop [[RpcEndpoint]] specified by `endpoint`.
    */
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index fd28441e3..c576e0c4d 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -80,10 +80,11 @@ license: |
 | celeborn.client.rpc.cache.size | 256 | false | The max cache items count for 
rpc cache. | 0.3.0 | celeborn.rpc.cache.size | 
 | celeborn.client.rpc.commitFiles.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | false | Timeout for CommitHandler commit files. | 
0.4.1 |  | 
 | celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | false | Timeout for ask operations during getting 
reducer file group information. During this process, there are 
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities 
for committing files and 1 times for releasing slots request. User can 
customize this value according to your setting. | 0.2.0 |  | 
-| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in 
LifecycleManager. | 0.3.2 |  | 
+| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in client. 
| 0.3.2 |  | 
 | celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | false | Timeout for ask operations during 
register shuffle. During this process, there are two times for retry 
opportunities for requesting slots, one request for establishing a connection 
with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry 
opportunities for reserving slots. User can customize this value according to 
your setting. | 0.3.0 | celeborn.rpc.registerShuffle.askT [...]
 | celeborn.client.rpc.requestPartition.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | false | Timeout for ask operations during 
requesting change partition location, such as reviving or splitting partition. 
During this process, there are `celeborn.client.reserveSlots.maxRetries` times 
for retry opportunities for reserving slots. User can customize this value 
according to your setting. | 0.2.0 |  | 
 | celeborn.client.rpc.reserveSlots.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | false | Timeout for LifecycleManager request 
reserve slots. | 0.3.0 |  | 
+| celeborn.client.rpc.retryWait | 1s | false | Client-specified time to wait 
before next retry on RpcTimeoutException. | 0.6.0 |  | 
 | celeborn.client.rpc.shared.threads | 16 | false | Number of shared rpc 
threads in LifecycleManager. | 0.3.2 |  | 
 | celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | false 
| Interval for LifecycleManager to schedule handling change partition requests 
in batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.interval | 
 | celeborn.client.shuffle.batchHandleChangePartition.partitionBuckets | 256 | 
false | Max number of change partition requests which can be concurrently 
processed. | 0.5.0 |  | 
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index c958451ba..50b56c176 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -56,6 +56,7 @@ license: |
 | celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory 
bounded capacity. | 0.5.0 |  | 
 | celeborn.rpc.io.threads | &lt;undefined&gt; | false | Netty IO thread number 
of NettyRpcEnv to handle RPC request. The default threads number is the number 
of runtime available processors. | 0.2.0 |  | 
 | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup 
operations. | 0.2.0 |  | 
+| celeborn.rpc.retryWait | 1s | false | Time to wait before next retry on 
RpcTimeoutException. | 0.6.0 |  | 
 | celeborn.rpc.slow.interval | &lt;undefined&gt; | false | min interval (ms) 
for RPC framework to log slow RPC | 0.6.0 |  | 
 | celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to 
log slow RPC | 0.6.0 |  | 
 | celeborn.shuffle.io.maxChunksBeingTransferred | &lt;undefined&gt; | false | 
The max number of chunks allowed to be transferred at the same time on shuffle 
service. Note that new incoming connections will be closed when the max number 
is hit. The client will retry according to the shuffle retry configs (see 
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if 
those limits are reached the task will fail with fetch failure. | 0.2.0 |  | 

Reply via email to