This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 15de4e544 [CELEBORN-1412] celeborn.client.rpc.*.askTimeout should
fallback to celeborn.rpc.askTimeout
15de4e544 is described below
commit 15de4e5440f454a2a6ecdf708757b03ff082fd9a
Author: SteNicholas <[email protected]>
AuthorDate: Tue May 7 13:47:22 2024 +0800
[CELEBORN-1412] celeborn.client.rpc.*.askTimeout should fallback to
celeborn.rpc.askTimeout
### What changes were proposed in this pull request?
`celeborn.client.rpc.*.askTimeout` should fallback to
`celeborn.rpc.askTimeout`.
### Why are the changes needed?
The config option series `celeborn.client.rpc.*.askTimeout` should fallback
to `celeborn.rpc.askTimeout` instead of
`celeborn.<module>.io.connectionTimeout`, which including
`celeborn.client.rpc.getReducerFileGroup.askTimeout`,
`celeborn.client.rpc.registerShuffle.askTimeout` and
`celeborn.client.rpc.requestPartition.askTimeout`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2492 from SteNicholas/CELEBORN-1412.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 1cd231f5e036f59f2a3981b683fcb34685a28e72)
Signed-off-by: SteNicholas <[email protected]>
---
.../flink/readclient/FlinkShuffleClientImpl.java | 2 +-
.../apache/celeborn/client/ShuffleClientImpl.java | 26 ++++++---------
.../celeborn/client/ShuffleClientHelper.scala | 2 +-
.../org/apache/celeborn/common/CelebornConf.scala | 39 ++++++++++------------
.../apache/celeborn/common/CelebornConfSuite.scala | 6 ++--
docs/configuration/client.md | 6 ++--
docs/migration.md | 2 ++
7 files changed, 39 insertions(+), 44 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 6ced153c4..b92f82325 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -439,7 +439,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
- conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
+ conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
// per partitionKey only serve single PartitionLocation in Client Cache.
PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(0);
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d1d70764d..372bab262 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -456,9 +456,7 @@ public class ShuffleClientImpl extends ShuffleClient {
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
- + conf.clientRpcRequestPartitionLocationRpcAskTimeout()
- .duration()
- .toMillis()));
+ +
conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis()));
}
}
@@ -479,7 +477,7 @@ public class ShuffleClientImpl extends ShuffleClient {
() ->
lifecycleManagerRef.askSync(
RegisterShuffle$.MODULE$.apply(shuffleId, numMappers,
numPartitions),
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
}
@@ -502,7 +500,7 @@ public class ShuffleClientImpl extends ShuffleClient {
lifecycleManagerRef.askSync(
RegisterMapPartitionTask$.MODULE$.apply(
shuffleId, numMappers, mapId, attemptId, partitionId),
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
if (partitionLocationMap == null) {
@@ -538,7 +536,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PbGetShuffleIdResponse pbGetShuffleIdResponse =
lifecycleManagerRef.askSync(
pbGetShuffleId,
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
return pbGetShuffleIdResponse.getShuffleId();
});
@@ -554,7 +552,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PbReportShuffleFetchFailureResponse pbReportShuffleFetchFailureResponse =
lifecycleManagerRef.askSync(
pbReportShuffleFetchFailure,
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbReportShuffleFetchFailureResponse.class));
return pbReportShuffleFetchFailureResponse.getSuccess();
}
@@ -744,7 +742,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
Revive$.MODULE$.apply(shuffleId, mapIds, requests),
- conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
+ conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
for (int i = 0; i < response.getEndedMapIdCount(); i++) {
@@ -991,7 +989,7 @@ public class ShuffleClientImpl extends ShuffleClient {
reviveManager.addRequest(reviveRequest);
long dueTime =
System.currentTimeMillis()
- +
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ + conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis();
pushDataRetryPool.submit(
@@ -1077,9 +1075,7 @@ public class ShuffleClientImpl extends ShuffleClient {
reviveManager.addRequest(reviveRequest);
long dueTime =
System.currentTimeMillis()
- + conf.clientRpcRequestPartitionLocationRpcAskTimeout()
- .duration()
- .toMillis();
+ +
conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis();
pushDataRetryPool.submit(
() ->
submitRetryPushData(
@@ -1385,7 +1381,7 @@ public class ShuffleClientImpl extends ShuffleClient {
requests,
remainReviveTimes,
System.currentTimeMillis()
- +
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ +
conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis()));
} else if (reason ==
StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
@@ -1465,7 +1461,7 @@ public class ShuffleClientImpl extends ShuffleClient {
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
- +
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ +
conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis()));
} else {
@@ -1583,7 +1579,7 @@ public class ShuffleClientImpl extends ShuffleClient {
GetReducerFileGroupResponse response =
lifecycleManagerRef.askSync(
getReducerFileGroup,
- conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
+ conf.clientRpcGetReducerFileGroupAskTimeout(),
ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
switch (response.status()) {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
index b94ba37c7..c1e8e3a77 100644
--- a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
@@ -40,7 +40,7 @@ object ShuffleClientHelper extends Logging {
shuffleLocs: ConcurrentHashMap[Integer, PartitionLocation]): Unit = {
endpointRef.ask[PbChangeLocationResponse](
req,
- conf.clientRpcRequestPartitionLocationRpcAskTimeout).onComplete {
+ conf.clientRpcRequestPartitionLocationAskTimeout).onComplete {
case Success(resp) =>
val partitionInfo = resp.getPartitionInfo(0)
val respStatus = Utils.toStatusCode(partitionInfo.getStatus)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 6ff18b5a6..5c1cc0865 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -782,20 +782,20 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(CLIENT_RESERVE_SLOTS_RPC_TIMEOUT).milli,
CLIENT_RESERVE_SLOTS_RPC_TIMEOUT.key)
- def clientRpcRegisterShuffleRpcAskTimeout: RpcTimeout =
+ def clientRpcRegisterShuffleAskTimeout: RpcTimeout =
new RpcTimeout(
- get(CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
- CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
+ get(CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT).milli,
+ CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT.key)
- def clientRpcRequestPartitionLocationRpcAskTimeout: RpcTimeout =
+ def clientRpcRequestPartitionLocationAskTimeout: RpcTimeout =
new RpcTimeout(
- get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
- CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
+ get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT).milli,
+ CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT.key)
- def clientRpcGetReducerFileGroupRpcAskTimeout: RpcTimeout =
+ def clientRpcGetReducerFileGroupAskTimeout: RpcTimeout =
new RpcTimeout(
- get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
- CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
+ get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT).milli,
+ CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT.key)
def clientRpcCommitFilesAskTimeout: RpcTimeout =
new RpcTimeout(
@@ -3767,7 +3767,7 @@ object CelebornConf extends Logging {
.doc("Timeout for LifecycleManager request reserve slots.")
.fallbackConf(RPC_ASK_TIMEOUT)
- val CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+ val CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.registerShuffle.askTimeout")
.withAlternative("celeborn.rpc.registerShuffle.askTimeout")
.categories("client")
@@ -3776,29 +3776,26 @@ object CelebornConf extends Logging {
s"During this process, there are two times for retry opportunities for
requesting slots, " +
s"one request for establishing a connection with Worker and " +
s"`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry
opportunities for reserving slots. " +
- s"User can customize this value according to your setting. " +
- s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
- .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+ s"User can customize this value according to your setting.")
+ .fallbackConf(RPC_ASK_TIMEOUT)
- val CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: ConfigEntry[Long]
=
+ val CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.requestPartition.askTimeout")
.categories("client")
.version("0.2.0")
.doc(s"Timeout for ask operations during requesting change partition
location, such as reviving or splitting partition. " +
s"During this process, there are
`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for
reserving slots. " +
- s"User can customize this value according to your setting. " +
- s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
- .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+ s"User can customize this value according to your setting.")
+ .fallbackConf(RPC_ASK_TIMEOUT)
- val CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+ val CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.getReducerFileGroup.askTimeout")
.categories("client")
.version("0.2.0")
.doc(s"Timeout for ask operations during getting reducer file group
information. " +
s"During this process, there are
`${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for
committing files " +
- s"and 1 times for releasing slots request. User can customize this
value according to your setting. " +
- s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
- .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+ s"and 1 times for releasing slots request. User can customize this
value according to your setting.")
+ .fallbackConf(RPC_ASK_TIMEOUT)
val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.commitFiles.askTimeout")
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 736a44619..9f398fcd8 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -190,9 +190,9 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(conf.networkTimeout.duration.toMillis == 20000L)
assert(conf.networkIoConnectionTimeoutMs("data") == 20000L)
assert(conf.clientPushStageEndTimeout == 20000L)
- assert(conf.clientRpcRegisterShuffleRpcAskTimeout.duration.toMillis ==
20000L)
-
assert(conf.clientRpcRequestPartitionLocationRpcAskTimeout.duration.toMillis ==
20000L)
- assert(conf.clientRpcGetReducerFileGroupRpcAskTimeout.duration.toMillis ==
20000L)
+ assert(conf.clientRpcRegisterShuffleAskTimeout.duration.toMillis == 1000L)
+ assert(conf.clientRpcRequestPartitionLocationAskTimeout.duration.toMillis
== 1000L)
+ assert(conf.clientRpcGetReducerFileGroupAskTimeout.duration.toMillis ==
1000L)
assert(conf.networkConnectTimeout.duration.toMillis == 2000L)
assert(conf.networkIoConnectTimeoutMs("data") == 2000L)
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 929d70688..304a84e60 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -78,10 +78,10 @@ license: |
| celeborn.client.rpc.cache.expireTime | 15s | The time before a cache item is
removed. | 0.3.0 | celeborn.rpc.cache.expireTime |
| celeborn.client.rpc.cache.size | 256 | The max cache items count for rpc
cache. | 0.3.0 | celeborn.rpc.cache.size |
| celeborn.client.rpc.commitFiles.askTimeout | <value of
celeborn.rpc.askTimeout> | Timeout for CommitHandler commit files. | 0.4.1 |
|
-| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during getting reducer file group information. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing files and 1 times for releasing slots request. User can
customize this value according to your setting. By default, the value is the
max timeout value `celeborn.<module>.io.co [...]
+| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of
celeborn.rpc.askTimeout> | Timeout for ask operations during getting reducer
file group information. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing files and 1 times for releasing slots request. User can
customize this value according to your setting. | 0.2.0 | |
| celeborn.client.rpc.maxRetries | 3 | Max RPC retry times in
LifecycleManager. | 0.3.2 | |
-| celeborn.client.rpc.registerShuffle.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during register shuffle. During this process, there are two times for retry
opportunities for requesting slots, one request for establishing a connection
with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry
opportunities for reserving slots. User can customize this value according to
your setting. By default, the value is the m [...]
-| celeborn.client.rpc.requestPartition.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | Timeout for ask operations
during requesting change partition location, such as reviving or splitting
partition. During this process, there are
`celeborn.client.reserveSlots.maxRetries` times for retry opportunities for
reserving slots. User can customize this value according to your setting. By
default, the value is the max timeout value `celeborn.<module>.io.connectionTim
[...]
+| celeborn.client.rpc.registerShuffle.askTimeout | <value of
celeborn.rpc.askTimeout> | Timeout for ask operations during register
shuffle. During this process, there are two times for retry opportunities for
requesting slots, one request for establishing a connection with Worker and
`celeborn.client.reserveSlots.maxRetries` times for retry opportunities for
reserving slots. User can customize this value according to your setting. |
0.3.0 | celeborn.rpc.registerShuffle.askTimeout |
+| celeborn.client.rpc.requestPartition.askTimeout | <value of
celeborn.rpc.askTimeout> | Timeout for ask operations during requesting
change partition location, such as reviving or splitting partition. During this
process, there are `celeborn.client.reserveSlots.maxRetries` times for retry
opportunities for reserving slots. User can customize this value according to
your setting. | 0.2.0 | |
| celeborn.client.rpc.reserveSlots.askTimeout | <value of
celeborn.rpc.askTimeout> | Timeout for LifecycleManager request reserve
slots. | 0.3.0 | |
| celeborn.client.rpc.shared.threads | 16 | Number of shared rpc threads in
LifecycleManager. | 0.3.2 | |
| celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms |
Interval for LifecycleManager to schedule handling change partition requests in
batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.interval |
diff --git a/docs/migration.md b/docs/migration.md
index 6128357e7..21f29509f 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -26,6 +26,8 @@ license: |
- Since 0.4.1, Celeborn master adds a limit to the estimated partition size
used for computing worker slots.
This size is now constrained within the range specified by
`celeborn.master.estimatedPartitionSize.minSize` and
`celeborn.master.estimatedPartitionSize.maxSize`.
+- Since 0.4.1, Celeborn changed the fallback configuration of
`celeborn.client.rpc.getReducerFileGroup.askTimeout`,
`celeborn.client.rpc.registerShuffle.askTimeout` and
`celeborn.client.rpc.requestPartition.askTimeout` from
`celeborn.<module>.io.connectionTimeout` to `celeborn.rpc.askTimeout`.
+
## Upgrading from 0.3 to 0.4
- Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions
below 0.3.0.