This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new dc5219216 [CELEBORN-1409] CommitHandler commitFiles RPC supports 
separate timeout configuration
dc5219216 is described below

commit dc5219216331be1ab0b949f681542452a9c465b1
Author: sychen <[email protected]>
AuthorDate: Mon May 6 17:42:52 2024 +0800

    [CELEBORN-1409] CommitHandler commitFiles RPC supports separate timeout 
configuration
    
    ### What changes were proposed in this pull request?
    This PR aims to supports separate timeout configuration at CommitHandler 
commitFiles RPC.
    
    ### Why are the changes needed?
    The default value of `celeborn.worker.commitFiles.timeout` is 120s, and the 
default value of Client's RPC is 60s.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA
    
    Closes #2488 from cxzl25/CELEBORN-1409.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/celeborn/client/commit/CommitHandler.scala    |  3 ++-
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++++++
 docs/configuration/client.md                                 |  1 +
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index af4bd99f4..8532599bd 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -68,6 +68,7 @@ abstract class CommitHandler(
     val sharedRpcPool: ThreadPoolExecutor) extends Logging {
 
   private val pushReplicateEnabled = conf.clientPushReplicateEnabled
+  private val clientRpcCommitFilesAskTimeout = 
conf.clientRpcCommitFilesAskTimeout
 
   private val commitEpoch = new AtomicLong()
   private val totalWritten = new LongAdder
@@ -452,7 +453,7 @@ abstract class CommitHandler(
           message.replicaIds)
       }(ec)
     } else {
-      worker.endpoint.ask[CommitFilesResponse](message)
+      worker.endpoint.ask[CommitFilesResponse](message, 
clientRpcCommitFilesAskTimeout)
     }
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ec680fd53..edc6f25c3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -924,6 +924,11 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
       get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
       CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
 
+  def clientRpcCommitFilesAskTimeout: RpcTimeout =
+    new RpcTimeout(
+      get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli,
+      CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key)
+
   // //////////////////////////////////////////////////////
   //               Shuffle Client Fetch                  //
   // //////////////////////////////////////////////////////
@@ -4243,6 +4248,13 @@ object CelebornConf extends Logging {
         s"By default, the value is the max timeout value 
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
       .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
 
+  val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.client.rpc.commitFiles.askTimeout")
+      .categories("client")
+      .version("0.4.1")
+      .doc("Timeout for CommitHandler commit files.")
+      .fallbackConf(RPC_ASK_TIMEOUT)
+
   val CLIENT_RPC_CACHE_SIZE: ConfigEntry[Int] =
     buildConf("celeborn.client.rpc.cache.size")
       .withAlternative("celeborn.rpc.cache.size")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 64611d211..7ad3dbc5b 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -74,6 +74,7 @@ license: |
 | celeborn.client.rpc.cache.concurrencyLevel | 32 | false | The number of 
write locks to update rpc cache. | 0.3.0 | celeborn.rpc.cache.concurrencyLevel 
| 
 | celeborn.client.rpc.cache.expireTime | 15s | false | The time before a cache 
item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime | 
 | celeborn.client.rpc.cache.size | 256 | false | The max cache items count for 
rpc cache. | 0.3.0 | celeborn.rpc.cache.size | 
+| celeborn.client.rpc.commitFiles.askTimeout | &lt;value of 
celeborn.rpc.askTimeout&gt; | false | Timeout for CommitHandler commit files. | 
0.4.1 |  | 
 | celeborn.client.rpc.getReducerFileGroup.askTimeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | false | Timeout for ask 
operations during getting reducer file group information. During this process, 
there are `celeborn.client.requestCommitFiles.maxRetries` times for retry 
opportunities for committing files and 1 times for releasing slots request. 
User can customize this value according to your setting. By default, the value 
is the max timeout value `celeborn.<modul [...]
 | celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in 
LifecycleManager. | 0.3.2 |  | 
 | celeborn.client.rpc.registerShuffle.askTimeout | &lt;value of 
celeborn.&lt;module&gt;.io.connectionTimeout&gt; | false | Timeout for ask 
operations during register shuffle. During this process, there are two times 
for retry opportunities for requesting slots, one request for establishing a 
connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for 
retry opportunities for reserving slots. User can customize this value 
according to your setting. By default, the value  [...]

Reply via email to