This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new dc5219216 [CELEBORN-1409] CommitHandler commitFiles RPC supports
separate timeout configuration
dc5219216 is described below
commit dc5219216331be1ab0b949f681542452a9c465b1
Author: sychen <[email protected]>
AuthorDate: Mon May 6 17:42:52 2024 +0800
[CELEBORN-1409] CommitHandler commitFiles RPC supports separate timeout
configuration
### What changes were proposed in this pull request?
This PR aims to supports separate timeout configuration at CommitHandler
commitFiles RPC.
### Why are the changes needed?
The default value of `celeborn.worker.commitFiles.timeout` is 120s, and the
default value of Client's RPC is 60s.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2488 from cxzl25/CELEBORN-1409.
Authored-by: sychen <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/client/commit/CommitHandler.scala | 3 ++-
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++++++
docs/configuration/client.md | 1 +
3 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index af4bd99f4..8532599bd 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -68,6 +68,7 @@ abstract class CommitHandler(
val sharedRpcPool: ThreadPoolExecutor) extends Logging {
private val pushReplicateEnabled = conf.clientPushReplicateEnabled
+ private val clientRpcCommitFilesAskTimeout =
conf.clientRpcCommitFilesAskTimeout
private val commitEpoch = new AtomicLong()
private val totalWritten = new LongAdder
@@ -452,7 +453,7 @@ abstract class CommitHandler(
message.replicaIds)
}(ec)
} else {
- worker.endpoint.ask[CommitFilesResponse](message)
+ worker.endpoint.ask[CommitFilesResponse](message,
clientRpcCommitFilesAskTimeout)
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ec680fd53..edc6f25c3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -924,6 +924,11 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
+ def clientRpcCommitFilesAskTimeout: RpcTimeout =
+ new RpcTimeout(
+ get(CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT).milli,
+ CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT.key)
+
// //////////////////////////////////////////////////////
// Shuffle Client Fetch //
// //////////////////////////////////////////////////////
@@ -4243,6 +4248,13 @@ object CelebornConf extends Logging {
s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
.fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+ val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.client.rpc.commitFiles.askTimeout")
+ .categories("client")
+ .version("0.4.1")
+ .doc("Timeout for CommitHandler commit files.")
+ .fallbackConf(RPC_ASK_TIMEOUT)
+
val CLIENT_RPC_CACHE_SIZE: ConfigEntry[Int] =
buildConf("celeborn.client.rpc.cache.size")
.withAlternative("celeborn.rpc.cache.size")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 64611d211..7ad3dbc5b 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -74,6 +74,7 @@ license: |
| celeborn.client.rpc.cache.concurrencyLevel | 32 | false | The number of
write locks to update rpc cache. | 0.3.0 | celeborn.rpc.cache.concurrencyLevel
|
| celeborn.client.rpc.cache.expireTime | 15s | false | The time before a cache
item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime |
| celeborn.client.rpc.cache.size | 256 | false | The max cache items count for
rpc cache. | 0.3.0 | celeborn.rpc.cache.size |
+| celeborn.client.rpc.commitFiles.askTimeout | <value of
celeborn.rpc.askTimeout> | false | Timeout for CommitHandler commit files. |
0.4.1 | |
| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | false | Timeout for ask
operations during getting reducer file group information. During this process,
there are `celeborn.client.requestCommitFiles.maxRetries` times for retry
opportunities for committing files and 1 times for releasing slots request.
User can customize this value according to your setting. By default, the value
is the max timeout value `celeborn.<modul [...]
| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in
LifecycleManager. | 0.3.2 | |
| celeborn.client.rpc.registerShuffle.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | false | Timeout for ask
operations during register shuffle. During this process, there are two times
for retry opportunities for requesting slots, one request for establishing a
connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for
retry opportunities for reserving slots. User can customize this value
according to your setting. By default, the value [...]