This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 15de4e544 [CELEBORN-1412] celeborn.client.rpc.*.askTimeout should 
fallback to celeborn.rpc.askTimeout
15de4e544 is described below

commit 15de4e5440f454a2a6ecdf708757b03ff082fd9a
Author: SteNicholas <[email protected]>
AuthorDate: Tue May 7 13:47:22 2024 +0800

    [CELEBORN-1412] celeborn.client.rpc.*.askTimeout should fallback to 
celeborn.rpc.askTimeout
    
    ### What changes were proposed in this pull request?
    
    `celeborn.client.rpc.*.askTimeout` should fallback to 
`celeborn.rpc.askTimeout`.
    
    ### Why are the changes needed?
    
    The config option series `celeborn.client.rpc.*.askTimeout` should fallback 
to `celeborn.rpc.askTimeout` instead of 
`celeborn.<module>.io.connectionTimeout`, which including 
`celeborn.client.rpc.getReducerFileGroup.askTimeout`, 
`celeborn.client.rpc.registerShuffle.askTimeout` and 
`celeborn.client.rpc.requestPartition.askTimeout`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GA.
    
    Closes #2492 from SteNicholas/CELEBORN-1412.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 1cd231f5e036f59f2a3981b683fcb34685a28e72)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../flink/readclient/FlinkShuffleClientImpl.java   |  2 +-
 .../apache/celeborn/client/ShuffleClientImpl.java  | 26 ++++++---------
 .../celeborn/client/ShuffleClientHelper.scala      |  2 +-
 .../org/apache/celeborn/common/CelebornConf.scala  | 39 ++++++++++------------
 .../apache/celeborn/common/CelebornConfSuite.scala |  6 ++--
 docs/configuration/client.md                       |  6 ++--
 docs/migration.md                                  |  2 ++
 7 files changed, 39 insertions(+), 44 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 6ced153c4..b92f82325 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -439,7 +439,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
     PbChangeLocationResponse response =
         lifecycleManagerRef.askSync(
             ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
-            conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
+            conf.clientRpcRequestPartitionLocationAskTimeout(),
             ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
     // per partitionKey only serve single PartitionLocation in Client Cache.
     PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(0);
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d1d70764d..372bab262 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -456,9 +456,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                   requests,
                   remainReviveTimes - 1,
                   System.currentTimeMillis()
-                      + conf.clientRpcRequestPartitionLocationRpcAskTimeout()
-                          .duration()
-                          .toMillis()));
+                      + 
conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis()));
     }
   }
 
@@ -479,7 +477,7 @@ public class ShuffleClientImpl extends ShuffleClient {
         () ->
             lifecycleManagerRef.askSync(
                 RegisterShuffle$.MODULE$.apply(shuffleId, numMappers, 
numPartitions),
-                conf.clientRpcRegisterShuffleRpcAskTimeout(),
+                conf.clientRpcRegisterShuffleAskTimeout(),
                 ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
   }
 
@@ -502,7 +500,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                 lifecycleManagerRef.askSync(
                     RegisterMapPartitionTask$.MODULE$.apply(
                         shuffleId, numMappers, mapId, attemptId, partitionId),
-                    conf.clientRpcRegisterShuffleRpcAskTimeout(),
+                    conf.clientRpcRegisterShuffleAskTimeout(),
                     ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
 
     if (partitionLocationMap == null) {
@@ -538,7 +536,7 @@ public class ShuffleClientImpl extends ShuffleClient {
           PbGetShuffleIdResponse pbGetShuffleIdResponse =
               lifecycleManagerRef.askSync(
                   pbGetShuffleId,
-                  conf.clientRpcRegisterShuffleRpcAskTimeout(),
+                  conf.clientRpcRegisterShuffleAskTimeout(),
                   ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
           return pbGetShuffleIdResponse.getShuffleId();
         });
@@ -554,7 +552,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     PbReportShuffleFetchFailureResponse pbReportShuffleFetchFailureResponse =
         lifecycleManagerRef.askSync(
             pbReportShuffleFetchFailure,
-            conf.clientRpcRegisterShuffleRpcAskTimeout(),
+            conf.clientRpcRegisterShuffleAskTimeout(),
             
ClassTag$.MODULE$.apply(PbReportShuffleFetchFailureResponse.class));
     return pbReportShuffleFetchFailureResponse.getSuccess();
   }
@@ -744,7 +742,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       PbChangeLocationResponse response =
           lifecycleManagerRef.askSync(
               Revive$.MODULE$.apply(shuffleId, mapIds, requests),
-              conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
+              conf.clientRpcRequestPartitionLocationAskTimeout(),
               ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
 
       for (int i = 0; i < response.getEndedMapIdCount(); i++) {
@@ -991,7 +989,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                   reviveManager.addRequest(reviveRequest);
                   long dueTime =
                       System.currentTimeMillis()
-                          + 
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+                          + conf.clientRpcRequestPartitionLocationAskTimeout()
                               .duration()
                               .toMillis();
                   pushDataRetryPool.submit(
@@ -1077,9 +1075,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                 reviveManager.addRequest(reviveRequest);
                 long dueTime =
                     System.currentTimeMillis()
-                        + conf.clientRpcRequestPartitionLocationRpcAskTimeout()
-                            .duration()
-                            .toMillis();
+                        + 
conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis();
                 pushDataRetryPool.submit(
                     () ->
                         submitRetryPushData(
@@ -1385,7 +1381,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                             requests,
                             remainReviveTimes,
                             System.currentTimeMillis()
-                                + 
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+                                + 
conf.clientRpcRequestPartitionLocationAskTimeout()
                                     .duration()
                                     .toMillis()));
               } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
@@ -1465,7 +1461,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                           requests,
                           remainReviveTimes - 1,
                           System.currentTimeMillis()
-                              + 
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+                              + 
conf.clientRpcRequestPartitionLocationAskTimeout()
                                   .duration()
                                   .toMillis()));
             } else {
@@ -1583,7 +1579,7 @@ public class ShuffleClientImpl extends ShuffleClient {
           GetReducerFileGroupResponse response =
               lifecycleManagerRef.askSync(
                   getReducerFileGroup,
-                  conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
+                  conf.clientRpcGetReducerFileGroupAskTimeout(),
                   ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
 
           switch (response.status()) {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala 
b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
index b94ba37c7..c1e8e3a77 100644
--- a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
@@ -40,7 +40,7 @@ object ShuffleClientHelper extends Logging {
       shuffleLocs: ConcurrentHashMap[Integer, PartitionLocation]): Unit = {
     endpointRef.ask[PbChangeLocationResponse](
       req,
-      conf.clientRpcRequestPartitionLocationRpcAskTimeout).onComplete {
+      conf.clientRpcRequestPartitionLocationAskTimeout).onComplete {
       case Success(resp) =>
         val partitionInfo = resp.getPartitionInfo(0)
         val respStatus = Utils.toStatusCode(partitionInfo.getStatus)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 6ff18b5a6..5c1cc0865 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -782,20 +782,20 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
       get(CLIENT_RESERVE_SLOTS_RPC_TIMEOUT).milli,
       CLIENT_RESERVE_SLOTS_RPC_TIMEOUT.key)
 
-  def clientRpcRegisterShuffleRpcAskTimeout: RpcTimeout =
+  def clientRpcRegisterShuffleAskTimeout: RpcTimeout =
     new RpcTimeout(
-      get(CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
-      CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
+      get(CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT).milli,
+      CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT.key)
 
-  def clientRpcRequestPartitionLocationRpcAskTimeout: RpcTimeout =
+  def clientRpcRequestPartitionLocationAskTimeout: RpcTimeout =
     new RpcTimeout(
-      get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
-      CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
+      get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT).milli,
+      CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT.key)
 
-  def clientRpcGetReducerFileGroupRpcAskTimeout: RpcTimeout =
+  def clientRpcGetReducerFileGroupAskTimeout: RpcTimeout =
     new RpcTimeout(
-      get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
-      CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
+      get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT).milli,
+      CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT.key)
 
   def clientRpcCommitFilesAskTimeout: RpcTimeout =
     new RpcTimeout(
@@ -3767,7 +3767,7 @@ object CelebornConf extends Logging {
       .doc("Timeout for LifecycleManager request reserve slots.")
       .fallbackConf(RPC_ASK_TIMEOUT)
 
-  val CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+  val CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.client.rpc.registerShuffle.askTimeout")
       .withAlternative("celeborn.rpc.registerShuffle.askTimeout")
       .categories("client")
@@ -3776,29 +3776,26 @@ object CelebornConf extends Logging {
         s"During this process, there are two times for retry opportunities for 
requesting slots, " +
         s"one request for establishing a connection with Worker and " +
         s"`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry 
opportunities for reserving slots. " +
-        s"User can customize this value according to your setting. " +
-        s"By default, the value is the max timeout value 
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
-      .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+        s"User can customize this value according to your setting.")
+      .fallbackConf(RPC_ASK_TIMEOUT)
 
-  val CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: ConfigEntry[Long] 
=
+  val CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.client.rpc.requestPartition.askTimeout")
       .categories("client")
       .version("0.2.0")
       .doc(s"Timeout for ask operations during requesting change partition 
location, such as reviving or splitting partition. " +
         s"During this process, there are 
`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for 
reserving slots. " +
-        s"User can customize this value according to your setting. " +
-        s"By default, the value is the max timeout value 
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
-      .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+        s"User can customize this value according to your setting.")
+      .fallbackConf(RPC_ASK_TIMEOUT)
 
-  val CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+  val CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.client.rpc.getReducerFileGroup.askTimeout")
       .categories("client")
       .version("0.2.0")
       .doc(s"Timeout for ask operations during getting reducer file group 
information. " +
         s"During this process, there are 
`${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for 
committing files " +
-        s"and 1 times for releasing slots request. User can customize this 
value according to your setting. " +
-        s"By default, the value is the max timeout value 
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
-      .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+        s"and 1 times for releasing slots request. User can customize this 
value according to your setting.")
+      .fallbackConf(RPC_ASK_TIMEOUT)
 
   val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.client.rpc.commitFiles.askTimeout")
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 736a44619..9f398fcd8 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -190,9 +190,9 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(conf.networkTimeout.duration.toMillis == 20000L)
     assert(conf.networkIoConnectionTimeoutMs("data") == 20000L)
     assert(conf.clientPushStageEndTimeout == 20000L)
-    assert(conf.clientRpcRegisterShuffleRpcAskTimeout.duration.toMillis == 
20000L)
-    
assert(conf.clientRpcRequestPartitionLocationRpcAskTimeout.duration.toMillis == 
20000L)
-    assert(conf.clientRpcGetReducerFileGroupRpcAskTimeout.duration.toMillis == 
20000L)
+    assert(conf.clientRpcRegisterShuffleAskTimeout.duration.toMillis == 1000L)
+    assert(conf.clientRpcRequestPartitionLocationAskTimeout.duration.toMillis 
== 1000L)
+    assert(conf.clientRpcGetReducerFileGroupAskTimeout.duration.toMillis == 
1000L)
     assert(conf.networkConnectTimeout.duration.toMillis == 2000L)
     assert(conf.networkIoConnectTimeoutMs("data") == 2000L)
   }
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 929d70688..304a84e60 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -78,10 +78,10 @@ license: |
 | celeborn.client.rpc.cache.expireTime | 15s | The time before a cache item is 
removed. | 0.3.0 | celeborn.rpc.cache.expireTime | 
 | celeborn.client.rpc.cache.size | 256 | The max cache items count for rpc 
cache. | 0.3.0 | celeborn.rpc.cache.size | 
 | celeborn.client.rpc.commitFiles.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | Timeout for CommitHandler commit files. | 0.4.1 | 
 | 
-| celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations 
during getting reducer file group information. During this process, there are 
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities 
for committing files and 1 times for releasing slots request. User can 
customize this value according to your setting. By default, the value is the 
max timeout value `celeborn.<module>.io.co [...]
+| celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | Timeout for ask operations during getting reducer 
file group information. During this process, there are 
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities 
for committing files and 1 times for releasing slots request. User can 
customize this value according to your setting. | 0.2.0 |  | 
 | celeborn.client.rpc.maxRetries | 3 | Max RPC retry times in 
LifecycleManager. | 0.3.2 |  | 
-| celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations 
during register shuffle. During this process, there are two times for retry 
opportunities for requesting slots, one request for establishing a connection 
with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry 
opportunities for reserving slots. User can customize this value according to 
your setting. By default, the value is the m [...]
-| celeborn.client.rpc.requestPartition.askTimeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations 
during requesting change partition location, such as reviving or splitting 
partition. During this process, there are 
`celeborn.client.reserveSlots.maxRetries` times for retry opportunities for 
reserving slots. User can customize this value according to your setting. By 
default, the value is the max timeout value `celeborn.<module>.io.connectionTim 
[...]
+| celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | Timeout for ask operations during register 
shuffle. During this process, there are two times for retry opportunities for 
requesting slots, one request for establishing a connection with Worker and 
`celeborn.client.reserveSlots.maxRetries` times for retry opportunities for 
reserving slots. User can customize this value according to your setting. | 
0.3.0 | celeborn.rpc.registerShuffle.askTimeout | 
+| celeborn.client.rpc.requestPartition.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | Timeout for ask operations during requesting 
change partition location, such as reviving or splitting partition. During this 
process, there are `celeborn.client.reserveSlots.maxRetries` times for retry 
opportunities for reserving slots. User can customize this value according to 
your setting. | 0.2.0 |  | 
 | celeborn.client.rpc.reserveSlots.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | Timeout for LifecycleManager request reserve 
slots. | 0.3.0 |  | 
 | celeborn.client.rpc.shared.threads | 16 | Number of shared rpc threads in 
LifecycleManager. | 0.3.2 |  | 
 | celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | 
Interval for LifecycleManager to schedule handling change partition requests in 
batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.interval | 
diff --git a/docs/migration.md b/docs/migration.md
index 6128357e7..21f29509f 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -26,6 +26,8 @@ license: |
 - Since 0.4.1, Celeborn master adds a limit to the estimated partition size 
used for computing worker slots. 
   This size is now constrained within the range specified by 
`celeborn.master.estimatedPartitionSize.minSize` and 
`celeborn.master.estimatedPartitionSize.maxSize`.
 
+- Since 0.4.1, Celeborn changed the fallback configuration of 
`celeborn.client.rpc.getReducerFileGroup.askTimeout`, 
`celeborn.client.rpc.registerShuffle.askTimeout` and 
`celeborn.client.rpc.requestPartition.askTimeout` from 
`celeborn.<module>.io.connectionTimeout` to `celeborn.rpc.askTimeout`.
+
 ## Upgrading from 0.3 to 0.4
 
 - Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions 
below 0.3.0.

Reply via email to