This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit 8cf9a49945440f79ab3da6c4bdf0280e177077a6
Author: sychen <[email protected]>
AuthorDate: Mon May 6 17:42:52 2024 +0800

    [CELEBORN-1409] CommitHandler commitFiles RPC supports separate timeout 
configuration
    
    This PR aims to supports separate timeout configuration at CommitHandler 
commitFiles RPC.
    
    The default value of `celeborn.worker.commitFiles.timeout` is 120s, and the 
default value of Client's RPC is 60s.
    
    No
    
    GA
    
    Closes #2488 from cxzl25/CELEBORN-1409.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit dc5219216331be1ab0b949f681542452a9c465b1)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/celeborn/client/commit/CommitHandler.scala    |  3 ++-
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++++++
 docs/configuration/client.md                                 |  1 +
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index af4bd99f4..8532599bd 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -68,6 +68,7 @@ abstract class CommitHandler(
     val sharedRpcPool: ThreadPoolExecutor) extends Logging {
 
   private val pushReplicateEnabled = conf.clientPushReplicateEnabled
+  private val clientRpcCommitFilesAskTimeout = 
conf.clientRpcCommitFilesAskTimeout
 
   private val commitEpoch = new AtomicLong()
   private val totalWritten = new LongAdder
@@ -452,7 +453,7 @@ abstract class CommitHandler(
           message.replicaIds)
       }(ec)
     } else {
-      worker.endpoint.ask[CommitFilesResponse](message)
+      worker.endpoint.ask[CommitFilesResponse](message, 
clientRpcCommitFilesAskTimeout)
     }
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 261e97478..6ff18b5a6 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -797,6 +797,11 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
       get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
       CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
 
+  def clientRpcCommitFilesAskTimeout: RpcTimeout =
+    new RpcTimeout(
+      get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli,
+      CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key)
+
   // //////////////////////////////////////////////////////
   //               Shuffle Client Fetch                  //
   // //////////////////////////////////////////////////////
@@ -3795,6 +3800,13 @@ object CelebornConf extends Logging {
         s"By default, the value is the max timeout value 
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
       .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
 
+  val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.client.rpc.commitFiles.askTimeout")
+      .categories("client")
+      .version("0.4.1")
+      .doc("Timeout for CommitHandler commit files.")
+      .fallbackConf(RPC_ASK_TIMEOUT)
+
   val CLIENT_RPC_CACHE_SIZE: ConfigEntry[Int] =
     buildConf("celeborn.client.rpc.cache.size")
       .withAlternative("celeborn.rpc.cache.size")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index ba4863150..929d70688 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -77,6 +77,7 @@ license: |
 | celeborn.client.rpc.cache.concurrencyLevel | 32 | The number of write locks 
to update rpc cache. | 0.3.0 | celeborn.rpc.cache.concurrencyLevel | 
 | celeborn.client.rpc.cache.expireTime | 15s | The time before a cache item is 
removed. | 0.3.0 | celeborn.rpc.cache.expireTime | 
 | celeborn.client.rpc.cache.size | 256 | The max cache items count for rpc 
cache. | 0.3.0 | celeborn.rpc.cache.size | 
+| celeborn.client.rpc.commitFiles.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | Timeout for CommitHandler commit files. | 0.4.1 | 
 | 
 | celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations 
during getting reducer file group information. During this process, there are 
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities 
for committing files and 1 times for releasing slots request. User can 
customize this value according to your setting. By default, the value is the 
max timeout value `celeborn.<module>.io.co [...]
 | celeborn.client.rpc.maxRetries | 3 | Max RPC retry times in 
LifecycleManager. | 0.3.2 |  | 
 | celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for ask operations 
during register shuffle. During this process, there are two times for retry 
opportunities for requesting slots, one request for establishing a connection 
with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry 
opportunities for reserving slots. User can customize this value according to 
your setting. By default, the value is the m [...]

Reply via email to