This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.2 by this push:
     new b93927bc [CELEBORN-102][REFACTOR] TIMEOUT default value should be 
changed with network timeout (#1047)
b93927bc is described below

commit b93927bcb4b8eb5e7bbd225d2eb28f13b3075bf1
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Dec 6 14:41:23 2022 +0800

    [CELEBORN-102][REFACTOR] TIMEOUT default value should be changed with 
network timeout (#1047)
    
    * [CELEBORN-102][REFACTOR] TIMEOUT default value should be changed with 
network timeout
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 107 +++++++++++----------
 docs/configuration/client.md                       |   5 +-
 docs/configuration/network.md                      |   3 -
 3 files changed, 62 insertions(+), 53 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 8fd05d9b..64ccfd7c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -380,18 +380,6 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
   def haClientRpcAskTimeout: RpcTimeout =
     new RpcTimeout(get(HA_CLIENT_RPC_ASK_TIMEOUT).milli, 
HA_CLIENT_RPC_ASK_TIMEOUT.key)
-  def registerShuffleRpcAskTimeout: RpcTimeout =
-    new RpcTimeout(
-      get(REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
-      REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
-  def requestPartitionLocationRpcAskTimeout: RpcTimeout =
-    new RpcTimeout(
-      get(REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
-      REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
-  def getReducerFileGroupRpcAskTimeout: RpcTimeout =
-    new RpcTimeout(
-      get(GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
-      GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
 
   def networkIoMode(module: String): String = {
     val key = NETWORK_IO_MODE.key.replace("<module>", module)
@@ -675,7 +663,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def pushMaxReqsInFlight: Int = get(PUSH_MAX_REQS_IN_FLIGHT)
   def pushSortMemoryThreshold: Long = get(PUSH_SORT_MEMORY_THRESHOLD)
   def pushRetryThreads: Int = get(PUSH_RETRY_THREADS)
-  def pushStageEndTimeout: Long = get(PUSH_STAGE_END_TIMEOUT)
+  def pushStageEndTimeout: Long =
+    get(PUSH_STAGE_END_TIMEOUT).getOrElse(get(RPC_ASK_TIMEOUT) * 
(requestCommitFilesMaxRetries + 1))
   def pushLimitInFlightTimeoutMs: Long = get(PUSH_LIMIT_IN_FLIGHT_TIMEOUT)
   def pushLimitInFlightSleepDeltaMs: Long = 
get(PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
   def pushSplitPartitionThreads: Int = get(PUSH_SPLIT_PARTITION_THREADS)
@@ -692,6 +681,22 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def rpcCacheExpireTime: Long = get(RPC_CACHE_EXPIRE_TIME)
   def pushDataRpcTimeoutMs = get(PUSH_DATA_RPC_TIMEOUT)
 
+  def registerShuffleRpcAskTimeout: RpcTimeout =
+    new RpcTimeout(
+      get(REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).map(_.milli)
+        .getOrElse(rpcAskTimeout.duration * (reserveSlotsMaxRetries + 2)),
+      REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
+  def requestPartitionLocationRpcAskTimeout: RpcTimeout =
+    new RpcTimeout(
+      get(REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).map(_.milli)
+        .getOrElse(rpcAskTimeout.duration * (reserveSlotsMaxRetries + 1)),
+      REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
+  def getReducerFileGroupRpcAskTimeout: RpcTimeout =
+    new RpcTimeout(
+      get(GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).map(_.milli)
+        .getOrElse(rpcAskTimeout.duration * (requestCommitFilesMaxRetries + 
2)),
+      GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
+
   // //////////////////////////////////////////////////////
   //            Graceful Shutdown & Recover              //
   // //////////////////////////////////////////////////////
@@ -1035,30 +1040,6 @@ object CelebornConf extends Logging {
       .doc("Timeout for HA client RPC ask operations.")
       .fallbackConf(NETWORK_TIMEOUT)
 
-  val REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
-    buildConf("celeborn.rpc.registerShuffle.askTimeout")
-      .categories("network")
-      .version("0.2.0")
-      .doc("Timeout for ask operations during register shuffle.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("600s")
-
-  val REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
-    buildConf("celeborn.rpc.requestPartition.askTimeout")
-      .categories("network")
-      .version("0.2.0")
-      .doc("Timeout for ask operations during request change partition 
location, such as revive or split partition.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("600s")
-
-  val GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
-    buildConf("celeborn.rpc.getReducerFileGroup.askTimeout")
-      .categories("network")
-      .version("0.2.0")
-      .doc("Timeout for ask operations during get reducer file group.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("600s")
-
   val NETWORK_IO_MODE: ConfigEntry[String] =
     buildConf("celeborn.<module>.io.mode")
       .categories("network")
@@ -2079,14 +2060,15 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefaultString("10min")
 
-  val PUSH_STAGE_END_TIMEOUT: ConfigEntry[Long] =
+  val PUSH_STAGE_END_TIMEOUT: OptionalConfigEntry[Long] =
     buildConf("celeborn.push.stageEnd.timeout")
       .withAlternative("rss.stage.end.timeout")
       .categories("client")
-      .doc("Timeout for StageEnd.")
+      .doc(s"Timeout for waiting StageEnd. " +
+        s"Default value should be `${RPC_ASK_TIMEOUT.key} * 
(${COMMIT_FILE_REQUEST_MAX_RETRY.key} + 1)`.")
       .version("0.2.0")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("240s")
+      .createOptional
 
   val PUSH_LIMIT_IN_FLIGHT_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.push.limit.inFlight.timeout")
@@ -2207,6 +2189,42 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("5s")
 
+  val PUSH_DATA_RPC_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.push.data.rpc.timeout")
+      .withAlternative("rss.push.data.rpc.timeout")
+      .categories("client")
+      .version("0.2.0")
+      .doc("Timeout for a task to push data rpc message.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("120s")
+
+  val REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
+    buildConf("celeborn.rpc.registerShuffle.askTimeout")
+      .categories("client")
+      .version("0.2.0")
+      .doc(s"Timeout for ask operations during register shuffle. " +
+        s"Default value should be `${RPC_ASK_TIMEOUT.key} * 
(${RESERVE_SLOTS_MAX_RETRIES.key} + 1 + 1)`.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  val REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
+    buildConf("celeborn.rpc.requestPartition.askTimeout")
+      .categories("client")
+      .version("0.2.0")
+      .doc(s"Timeout for ask operations during request change partition 
location, such as revive or split partition. " +
+        s"Default value should be `${RPC_ASK_TIMEOUT.key} * 
(${RESERVE_SLOTS_MAX_RETRIES.key} + 1)`.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  val GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
+    buildConf("celeborn.rpc.getReducerFileGroup.askTimeout")
+      .categories("client")
+      .version("0.2.0")
+      .doc(s"Timeout for ask operations during get reducer file group. " +
+        s"Default value should be `${RPC_ASK_TIMEOUT.key} * 
(${COMMIT_FILE_REQUEST_MAX_RETRY.key} + 1 + 1)`.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
   val PORT_MAX_RETRY: ConfigEntry[Int] =
     buildConf("celeborn.port.maxRetries")
       .withAlternative("rss.master.port.maxretry")
@@ -2705,13 +2723,4 @@ object CelebornConf extends Logging {
       .doc("The time before a cache item is removed.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("15s")
-
-  val PUSH_DATA_RPC_TIMEOUT: ConfigEntry[Long] =
-    buildConf("celeborn.push.data.rpc.timeout")
-      .withAlternative("rss.push.data.rpc.timeout")
-      .categories("client")
-      .version("0.2.0")
-      .doc("Timeout for a task to push data rpc message.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("120s")
 }
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 6ffa4542..8ec3aafc 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -36,12 +36,15 @@ license: |
 | celeborn.push.retry.threads | 8 | Thread number to process shuffle re-send 
push data requests. | 0.2.0 | 
 | celeborn.push.sortMemory.threshold | 64m | When SortBasedPusher use memory 
over the threshold, will trigger push data. | 0.2.0 | 
 | celeborn.push.splitPartition.threads | 8 | Thread number to process shuffle 
split request in shuffle client. | 0.2.0 | 
-| celeborn.push.stageEnd.timeout | 240s | Timeout for StageEnd. | 0.2.0 | 
+| celeborn.push.stageEnd.timeout | &lt;undefined&gt; | Timeout for waiting 
StageEnd. Default value should be `celeborn.rpc.askTimeout * 
(celeborn.rpc.requestCommitFiles.maxRetries + 1)`. | 0.2.0 | 
 | celeborn.rpc.cache.concurrencyLevel | 32 | The number of write locks to 
update rpc cache. | 0.2.0 | 
 | celeborn.rpc.cache.expireTime | 15s | The time before a cache item is 
removed. | 0.2.0 | 
 | celeborn.rpc.cache.size | 256 | The max cache items count for rpc cache. | 
0.2.0 | 
+| celeborn.rpc.getReducerFileGroup.askTimeout | &lt;undefined&gt; | Timeout 
for ask operations during get reducer file group. Default value should be 
`celeborn.rpc.askTimeout * (celeborn.rpc.requestCommitFiles.maxRetries + 1 + 
1)`. | 0.2.0 | 
 | celeborn.rpc.maxParallelism | 1024 | Max parallelism of client on sending 
RPC requests. | 0.2.0 | 
+| celeborn.rpc.registerShuffle.askTimeout | &lt;undefined&gt; | Timeout for 
ask operations during register shuffle. Default value should be 
`celeborn.rpc.askTimeout * (celeborn.slots.reserve.maxRetries + 1 + 1)`. | 
0.2.0 | 
 | celeborn.rpc.requestCommitFiles.maxRetries | 2 | Max retry times for 
requestCommitFiles RPC. | 1.0.0 | 
+| celeborn.rpc.requestPartition.askTimeout | &lt;undefined&gt; | Timeout for 
ask operations during request change partition location, such as revive or 
split partition. Default value should be `celeborn.rpc.askTimeout * 
(celeborn.slots.reserve.maxRetries + 1)`. | 0.2.0 | 
 | celeborn.shuffle.batchHandleChangePartition.enabled | false | When true, 
LifecycleManager will handle change partition request in batch. Otherwise, 
LifecycleManager will process the requests one by one | 0.2.0 | 
 | celeborn.shuffle.batchHandleChangePartition.interval | 100ms | Interval for 
LifecycleManager to schedule handling change partition requests in batch. | 
0.2.0 | 
 | celeborn.shuffle.batchHandleChangePartition.threads | 8 | Threads number for 
LifecycleManager to handle change partition request in batch. | 0.2.0 | 
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index f26fda52..d6b01c62 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -39,10 +39,7 @@ license: |
 | celeborn.port.maxRetries | 1 | When port is occupied, we will retry for max 
retry times. | 0.2.0 | 
 | celeborn.rpc.askTimeout | &lt;value of celeborn.network.timeout&gt; | 
Timeout for RPC ask operations. | 0.2.0 | 
 | celeborn.rpc.connect.threads | 64 |  | 0.2.0 | 
-| celeborn.rpc.getReducerFileGroup.askTimeout | 600s | Timeout for ask 
operations during get reducer file group. | 0.2.0 | 
 | celeborn.rpc.haClient.askTimeout | &lt;value of celeborn.network.timeout&gt; 
| Timeout for HA client RPC ask operations. | 0.2.0 | 
 | celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 
0.2.0 | 
-| celeborn.rpc.registerShuffle.askTimeout | 600s | Timeout for ask operations 
during register shuffle. | 0.2.0 | 
-| celeborn.rpc.requestPartition.askTimeout | 600s | Timeout for ask operations 
during request change partition location, such as revive or split partition. | 
0.2.0 | 
 | celeborn.shuffle.maxChunksBeingTransferred | 9223372036854775807 | The max 
number of chunks allowed to be transferred at the same time on shuffle service. 
Note that new incoming connections will be closed when the max number is hit. 
The client will retry according to the shuffle retry configs (see 
`celeborn.shuffle.io.maxRetries` and `celeborn.shuffle.io.retryWait`), if those 
limits are reached the task will fail with fetch failure. | 0.2.0 | 
 <!--end-include-->

Reply via email to