This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new de3ef0d6 [CELEBORN-102][REFACTOR] TIMEOUT default value should be
changed with network timeout (#1047)
de3ef0d6 is described below
commit de3ef0d69423ec82dab4851394238f725edc5d7f
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Dec 6 14:41:23 2022 +0800
[CELEBORN-102][REFACTOR] TIMEOUT default value should be changed with
network timeout (#1047)
* [CELEBORN-102][REFACTOR] TIMEOUT default value should be changed with
network timeout
---
.../org/apache/celeborn/common/CelebornConf.scala | 107 +++++++++++----------
docs/configuration/client.md | 5 +-
docs/configuration/network.md | 3 -
3 files changed, 62 insertions(+), 53 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 8fd05d9b..64ccfd7c 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -380,18 +380,6 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
def haClientRpcAskTimeout: RpcTimeout =
new RpcTimeout(get(HA_CLIENT_RPC_ASK_TIMEOUT).milli,
HA_CLIENT_RPC_ASK_TIMEOUT.key)
- def registerShuffleRpcAskTimeout: RpcTimeout =
- new RpcTimeout(
- get(REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
- REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
- def requestPartitionLocationRpcAskTimeout: RpcTimeout =
- new RpcTimeout(
- get(REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
- REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
- def getReducerFileGroupRpcAskTimeout: RpcTimeout =
- new RpcTimeout(
- get(GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
- GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
def networkIoMode(module: String): String = {
val key = NETWORK_IO_MODE.key.replace("<module>", module)
@@ -675,7 +663,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def pushMaxReqsInFlight: Int = get(PUSH_MAX_REQS_IN_FLIGHT)
def pushSortMemoryThreshold: Long = get(PUSH_SORT_MEMORY_THRESHOLD)
def pushRetryThreads: Int = get(PUSH_RETRY_THREADS)
- def pushStageEndTimeout: Long = get(PUSH_STAGE_END_TIMEOUT)
+ def pushStageEndTimeout: Long =
+ get(PUSH_STAGE_END_TIMEOUT).getOrElse(get(RPC_ASK_TIMEOUT) *
(requestCommitFilesMaxRetries + 1))
def pushLimitInFlightTimeoutMs: Long = get(PUSH_LIMIT_IN_FLIGHT_TIMEOUT)
def pushLimitInFlightSleepDeltaMs: Long =
get(PUSH_LIMIT_IN_FLIGHT_SLEEP_INTERVAL)
def pushSplitPartitionThreads: Int = get(PUSH_SPLIT_PARTITION_THREADS)
@@ -692,6 +681,22 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def rpcCacheExpireTime: Long = get(RPC_CACHE_EXPIRE_TIME)
def pushDataRpcTimeoutMs = get(PUSH_DATA_RPC_TIMEOUT)
+ def registerShuffleRpcAskTimeout: RpcTimeout =
+ new RpcTimeout(
+ get(REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).map(_.milli)
+ .getOrElse(rpcAskTimeout.duration * (reserveSlotsMaxRetries + 2)),
+ REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
+ def requestPartitionLocationRpcAskTimeout: RpcTimeout =
+ new RpcTimeout(
+ get(REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).map(_.milli)
+ .getOrElse(rpcAskTimeout.duration * (reserveSlotsMaxRetries + 1)),
+ REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
+ def getReducerFileGroupRpcAskTimeout: RpcTimeout =
+ new RpcTimeout(
+ get(GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).map(_.milli)
+ .getOrElse(rpcAskTimeout.duration * (requestCommitFilesMaxRetries +
2)),
+ GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
+
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
@@ -1035,30 +1040,6 @@ object CelebornConf extends Logging {
.doc("Timeout for HA client RPC ask operations.")
.fallbackConf(NETWORK_TIMEOUT)
- val REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
- buildConf("celeborn.rpc.registerShuffle.askTimeout")
- .categories("network")
- .version("0.2.0")
- .doc("Timeout for ask operations during register shuffle.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("600s")
-
- val REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
- buildConf("celeborn.rpc.requestPartition.askTimeout")
- .categories("network")
- .version("0.2.0")
- .doc("Timeout for ask operations during request change partition
location, such as revive or split partition.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("600s")
-
- val GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
- buildConf("celeborn.rpc.getReducerFileGroup.askTimeout")
- .categories("network")
- .version("0.2.0")
- .doc("Timeout for ask operations during get reducer file group.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("600s")
-
val NETWORK_IO_MODE: ConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
.categories("network")
@@ -2079,14 +2060,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10min")
- val PUSH_STAGE_END_TIMEOUT: ConfigEntry[Long] =
+ val PUSH_STAGE_END_TIMEOUT: OptionalConfigEntry[Long] =
buildConf("celeborn.push.stageEnd.timeout")
.withAlternative("rss.stage.end.timeout")
.categories("client")
- .doc("Timeout for StageEnd.")
+ .doc(s"Timeout for waiting StageEnd. " +
+ s"Default value should be `${RPC_ASK_TIMEOUT.key} *
(${COMMIT_FILE_REQUEST_MAX_RETRY.key} + 1)`.")
.version("0.2.0")
.timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("240s")
+ .createOptional
val PUSH_LIMIT_IN_FLIGHT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.push.limit.inFlight.timeout")
@@ -2207,6 +2189,42 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
+ val PUSH_DATA_RPC_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.push.data.rpc.timeout")
+ .withAlternative("rss.push.data.rpc.timeout")
+ .categories("client")
+ .version("0.2.0")
+ .doc("Timeout for a task to push data rpc message.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("120s")
+
+ val REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
+ buildConf("celeborn.rpc.registerShuffle.askTimeout")
+ .categories("client")
+ .version("0.2.0")
+ .doc(s"Timeout for ask operations during register shuffle. " +
+ s"Default value should be `${RPC_ASK_TIMEOUT.key} *
(${RESERVE_SLOTS_MAX_RETRIES.key} + 1 + 1)`.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
+ val REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
+ buildConf("celeborn.rpc.requestPartition.askTimeout")
+ .categories("client")
+ .version("0.2.0")
+ .doc(s"Timeout for ask operations during request change partition
location, such as revive or split partition. " +
+ s"Default value should be `${RPC_ASK_TIMEOUT.key} *
(${RESERVE_SLOTS_MAX_RETRIES.key} + 1)`.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
+ val GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: OptionalConfigEntry[Long] =
+ buildConf("celeborn.rpc.getReducerFileGroup.askTimeout")
+ .categories("client")
+ .version("0.2.0")
+ .doc(s"Timeout for ask operations during get reducer file group. " +
+ s"Default value should be `${RPC_ASK_TIMEOUT.key} *
(${COMMIT_FILE_REQUEST_MAX_RETRY.key} + 1 + 1)`.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
val PORT_MAX_RETRY: ConfigEntry[Int] =
buildConf("celeborn.port.maxRetries")
.withAlternative("rss.master.port.maxretry")
@@ -2705,13 +2723,4 @@ object CelebornConf extends Logging {
.doc("The time before a cache item is removed.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("15s")
-
- val PUSH_DATA_RPC_TIMEOUT: ConfigEntry[Long] =
- buildConf("celeborn.push.data.rpc.timeout")
- .withAlternative("rss.push.data.rpc.timeout")
- .categories("client")
- .version("0.2.0")
- .doc("Timeout for a task to push data rpc message.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("120s")
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 6ffa4542..8ec3aafc 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -36,12 +36,15 @@ license: |
| celeborn.push.retry.threads | 8 | Thread number to process shuffle re-send
push data requests. | 0.2.0 |
| celeborn.push.sortMemory.threshold | 64m | When SortBasedPusher use memory
over the threshold, will trigger push data. | 0.2.0 |
| celeborn.push.splitPartition.threads | 8 | Thread number to process shuffle
split request in shuffle client. | 0.2.0 |
-| celeborn.push.stageEnd.timeout | 240s | Timeout for StageEnd. | 0.2.0 |
+| celeborn.push.stageEnd.timeout | <undefined> | Timeout for waiting
StageEnd. Default value should be `celeborn.rpc.askTimeout *
(celeborn.rpc.requestCommitFiles.maxRetries + 1)`. | 0.2.0 |
| celeborn.rpc.cache.concurrencyLevel | 32 | The number of write locks to
update rpc cache. | 0.2.0 |
| celeborn.rpc.cache.expireTime | 15s | The time before a cache item is
removed. | 0.2.0 |
| celeborn.rpc.cache.size | 256 | The max cache items count for rpc cache. |
0.2.0 |
+| celeborn.rpc.getReducerFileGroup.askTimeout | <undefined> | Timeout
for ask operations during get reducer file group. Default value should be
`celeborn.rpc.askTimeout * (celeborn.rpc.requestCommitFiles.maxRetries + 1 +
1)`. | 0.2.0 |
| celeborn.rpc.maxParallelism | 1024 | Max parallelism of client on sending
RPC requests. | 0.2.0 |
+| celeborn.rpc.registerShuffle.askTimeout | <undefined> | Timeout for
ask operations during register shuffle. Default value should be
`celeborn.rpc.askTimeout * (celeborn.slots.reserve.maxRetries + 1 + 1)`. |
0.2.0 |
| celeborn.rpc.requestCommitFiles.maxRetries | 2 | Max retry times for
requestCommitFiles RPC. | 1.0.0 |
+| celeborn.rpc.requestPartition.askTimeout | <undefined> | Timeout for
ask operations during request change partition location, such as revive or
split partition. Default value should be `celeborn.rpc.askTimeout *
(celeborn.slots.reserve.maxRetries + 1)`. | 0.2.0 |
| celeborn.shuffle.batchHandleChangePartition.enabled | false | When true,
LifecycleManager will handle change partition request in batch. Otherwise,
LifecycleManager will process the requests one by one | 0.2.0 |
| celeborn.shuffle.batchHandleChangePartition.interval | 100ms | Interval for
LifecycleManager to schedule handling change partition requests in batch. |
0.2.0 |
| celeborn.shuffle.batchHandleChangePartition.threads | 8 | Threads number for
LifecycleManager to handle change partition request in batch. | 0.2.0 |
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index f26fda52..d6b01c62 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -39,10 +39,7 @@ license: |
| celeborn.port.maxRetries | 1 | When port is occupied, we will retry for max
retry times. | 0.2.0 |
| celeborn.rpc.askTimeout | <value of celeborn.network.timeout> |
Timeout for RPC ask operations. | 0.2.0 |
| celeborn.rpc.connect.threads | 64 | | 0.2.0 |
-| celeborn.rpc.getReducerFileGroup.askTimeout | 600s | Timeout for ask
operations during get reducer file group. | 0.2.0 |
| celeborn.rpc.haClient.askTimeout | <value of celeborn.network.timeout>
| Timeout for HA client RPC ask operations. | 0.2.0 |
| celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. |
0.2.0 |
-| celeborn.rpc.registerShuffle.askTimeout | 600s | Timeout for ask operations
during register shuffle. | 0.2.0 |
-| celeborn.rpc.requestPartition.askTimeout | 600s | Timeout for ask operations
during request change partition location, such as revive or split partition. |
0.2.0 |
| celeborn.shuffle.maxChunksBeingTransferred | 9223372036854775807 | The max
number of chunks allowed to be transferred at the same time on shuffle service.
Note that new incoming connections will be closed when the max number is hit.
The client will retry according to the shuffle retry configs (see
`celeborn.shuffle.io.maxRetries` and `celeborn.shuffle.io.retryWait`), if those
limits are reached the task will fail with fetch failure. | 0.2.0 |
<!--end-include-->