This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1cd231f5e [CELEBORN-1412] celeborn.client.rpc.*.askTimeout should
fallback to celeborn.rpc.askTimeout
1cd231f5e is described below
commit 1cd231f5e036f59f2a3981b683fcb34685a28e72
Author: SteNicholas <[email protected]>
AuthorDate: Tue May 7 13:47:22 2024 +0800
[CELEBORN-1412] celeborn.client.rpc.*.askTimeout should fallback to
celeborn.rpc.askTimeout
### What changes were proposed in this pull request?
`celeborn.client.rpc.*.askTimeout` should fallback to
`celeborn.rpc.askTimeout`.
### Why are the changes needed?
The config option series `celeborn.client.rpc.*.askTimeout` should fallback
to `celeborn.rpc.askTimeout` instead of
`celeborn.<module>.io.connectionTimeout`, which including
`celeborn.client.rpc.getReducerFileGroup.askTimeout`,
`celeborn.client.rpc.registerShuffle.askTimeout` and
`celeborn.client.rpc.requestPartition.askTimeout`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2492 from SteNicholas/CELEBORN-1412.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../flink/readclient/FlinkShuffleClientImpl.java | 2 +-
.../apache/celeborn/client/ShuffleClientImpl.java | 28 +++++++---------
.../org/apache/celeborn/common/CelebornConf.scala | 39 ++++++++++------------
.../apache/celeborn/common/CelebornConfSuite.scala | 6 ++--
docs/configuration/client.md | 6 ++--
docs/migration.md | 2 ++
6 files changed, 39 insertions(+), 44 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index bb599c93e..3d69613f8 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -439,7 +439,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
ControlMessages.Revive$.MODULE$.apply(shuffleId, mapIds, requests),
- conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
+ conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
// per partitionKey only serve single PartitionLocation in Client Cache.
PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(0);
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 11f13ea36..08377d6fb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -225,7 +225,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PbApplicationMeta pbApplicationMeta =
lifecycleManagerRef.askSync(
pbApplicationMetaRequest,
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbApplicationMeta.class));
logger.info("Initializing data client factory for secured {}.",
appUniqueId);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
@@ -492,9 +492,7 @@ public class ShuffleClientImpl extends ShuffleClient {
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
- + conf.clientRpcRequestPartitionLocationRpcAskTimeout()
- .duration()
- .toMillis()));
+ +
conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis()));
}
}
@@ -515,7 +513,7 @@ public class ShuffleClientImpl extends ShuffleClient {
() ->
lifecycleManagerRef.askSync(
RegisterShuffle$.MODULE$.apply(shuffleId, numMappers,
numPartitions),
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
}
@@ -538,7 +536,7 @@ public class ShuffleClientImpl extends ShuffleClient {
lifecycleManagerRef.askSync(
RegisterMapPartitionTask$.MODULE$.apply(
shuffleId, numMappers, mapId, attemptId, partitionId),
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbRegisterShuffleResponse.class)));
if (partitionLocationMap == null) {
@@ -574,7 +572,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PbGetShuffleIdResponse pbGetShuffleIdResponse =
lifecycleManagerRef.askSync(
pbGetShuffleId,
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbGetShuffleIdResponse.class));
return pbGetShuffleIdResponse.getShuffleId();
});
@@ -590,7 +588,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PbReportShuffleFetchFailureResponse pbReportShuffleFetchFailureResponse =
lifecycleManagerRef.askSync(
pbReportShuffleFetchFailure,
- conf.clientRpcRegisterShuffleRpcAskTimeout(),
+ conf.clientRpcRegisterShuffleAskTimeout(),
ClassTag$.MODULE$.apply(PbReportShuffleFetchFailureResponse.class));
return pbReportShuffleFetchFailureResponse.getSuccess();
}
@@ -780,7 +778,7 @@ public class ShuffleClientImpl extends ShuffleClient {
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
Revive$.MODULE$.apply(shuffleId, mapIds, requests),
- conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
+ conf.clientRpcRequestPartitionLocationAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
for (int i = 0; i < response.getEndedMapIdCount(); i++) {
@@ -1039,7 +1037,7 @@ public class ShuffleClientImpl extends ShuffleClient {
reviveManager.addRequest(reviveRequest);
long dueTime =
System.currentTimeMillis()
- +
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ + conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis();
pushDataRetryPool.submit(
@@ -1125,9 +1123,7 @@ public class ShuffleClientImpl extends ShuffleClient {
reviveManager.addRequest(reviveRequest);
long dueTime =
System.currentTimeMillis()
- + conf.clientRpcRequestPartitionLocationRpcAskTimeout()
- .duration()
- .toMillis();
+ +
conf.clientRpcRequestPartitionLocationAskTimeout().duration().toMillis();
pushDataRetryPool.submit(
() ->
submitRetryPushData(
@@ -1407,7 +1403,7 @@ public class ShuffleClientImpl extends ShuffleClient {
requests,
remainReviveTimes,
System.currentTimeMillis()
- +
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ +
conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis()));
} else if (reason ==
StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
@@ -1487,7 +1483,7 @@ public class ShuffleClientImpl extends ShuffleClient {
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
- +
conf.clientRpcRequestPartitionLocationRpcAskTimeout()
+ +
conf.clientRpcRequestPartitionLocationAskTimeout()
.duration()
.toMillis()));
} else {
@@ -1606,7 +1602,7 @@ public class ShuffleClientImpl extends ShuffleClient {
GetReducerFileGroupResponse response =
lifecycleManagerRef.askSync(
getReducerFileGroup,
- conf.clientRpcGetReducerFileGroupRpcAskTimeout(),
+ conf.clientRpcGetReducerFileGroupAskTimeout(),
ClassTag$.MODULE$.apply(GetReducerFileGroupResponse.class));
switch (response.status()) {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index edc6f25c3..d43dab053 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -909,20 +909,20 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(CLIENT_RESERVE_SLOTS_RPC_TIMEOUT).milli,
CLIENT_RESERVE_SLOTS_RPC_TIMEOUT.key)
- def clientRpcRegisterShuffleRpcAskTimeout: RpcTimeout =
+ def clientRpcRegisterShuffleAskTimeout: RpcTimeout =
new RpcTimeout(
- get(CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT).milli,
- CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT.key)
+ get(CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT).milli,
+ CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT.key)
- def clientRpcRequestPartitionLocationRpcAskTimeout: RpcTimeout =
+ def clientRpcRequestPartitionLocationAskTimeout: RpcTimeout =
new RpcTimeout(
- get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT).milli,
- CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT.key)
+ get(CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT).milli,
+ CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT.key)
- def clientRpcGetReducerFileGroupRpcAskTimeout: RpcTimeout =
+ def clientRpcGetReducerFileGroupAskTimeout: RpcTimeout =
new RpcTimeout(
- get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT).milli,
- CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT.key)
+ get(CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT).milli,
+ CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT.key)
def clientRpcCommitFilesAskTimeout: RpcTimeout =
new RpcTimeout(
@@ -4215,7 +4215,7 @@ object CelebornConf extends Logging {
.doc("Timeout for LifecycleManager request reserve slots.")
.fallbackConf(RPC_ASK_TIMEOUT)
- val CLIENT_RPC_REGISTER_SHUFFLE_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+ val CLIENT_RPC_REGISTER_SHUFFLE_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.registerShuffle.askTimeout")
.withAlternative("celeborn.rpc.registerShuffle.askTimeout")
.categories("client")
@@ -4224,29 +4224,26 @@ object CelebornConf extends Logging {
s"During this process, there are two times for retry opportunities for
requesting slots, " +
s"one request for establishing a connection with Worker and " +
s"`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry
opportunities for reserving slots. " +
- s"User can customize this value according to your setting. " +
- s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
- .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+ s"User can customize this value according to your setting.")
+ .fallbackConf(RPC_ASK_TIMEOUT)
- val CLIENT_RPC_REQUEST_PARTITION_LOCATION_RPC_ASK_TIMEOUT: ConfigEntry[Long]
=
+ val CLIENT_RPC_REQUEST_PARTITION_LOCATION_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.requestPartition.askTimeout")
.categories("client")
.version("0.2.0")
.doc(s"Timeout for ask operations during requesting change partition
location, such as reviving or splitting partition. " +
s"During this process, there are
`${CLIENT_RESERVE_SLOTS_MAX_RETRIES.key}` times for retry opportunities for
reserving slots. " +
- s"User can customize this value according to your setting. " +
- s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
- .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+ s"User can customize this value according to your setting.")
+ .fallbackConf(RPC_ASK_TIMEOUT)
- val CLIENT_RPC_GET_REDUCER_FILE_GROUP_RPC_ASK_TIMEOUT: ConfigEntry[Long] =
+ val CLIENT_RPC_GET_REDUCER_FILE_GROUP_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.getReducerFileGroup.askTimeout")
.categories("client")
.version("0.2.0")
.doc(s"Timeout for ask operations during getting reducer file group
information. " +
s"During this process, there are
`${CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY.key}` times for retry opportunities for
committing files " +
- s"and 1 times for releasing slots request. User can customize this
value according to your setting. " +
- s"By default, the value is the max timeout value
`${NETWORK_IO_CONNECTION_TIMEOUT.key}`.")
- .fallbackConf(NETWORK_IO_CONNECTION_TIMEOUT)
+ s"and 1 times for releasing slots request. User can customize this
value according to your setting.")
+ .fallbackConf(RPC_ASK_TIMEOUT)
val CLIENT_RPC_COMMIT_FILES_ASK_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.rpc.commitFiles.askTimeout")
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 6f05f6b65..08c673599 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -191,9 +191,9 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(conf.networkTimeout.duration.toMillis == 20000L)
assert(conf.networkIoConnectionTimeoutMs("data") == 20000L)
assert(conf.clientPushStageEndTimeout == 20000L)
- assert(conf.clientRpcRegisterShuffleRpcAskTimeout.duration.toMillis ==
20000L)
-
assert(conf.clientRpcRequestPartitionLocationRpcAskTimeout.duration.toMillis ==
20000L)
- assert(conf.clientRpcGetReducerFileGroupRpcAskTimeout.duration.toMillis ==
20000L)
+ assert(conf.clientRpcRegisterShuffleAskTimeout.duration.toMillis == 1000L)
+ assert(conf.clientRpcRequestPartitionLocationAskTimeout.duration.toMillis
== 1000L)
+ assert(conf.clientRpcGetReducerFileGroupAskTimeout.duration.toMillis ==
1000L)
assert(conf.networkConnectTimeout.duration.toMillis == 2000L)
assert(conf.networkIoConnectTimeoutMs("data") == 2000L)
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 7ad3dbc5b..314cd90ae 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -75,10 +75,10 @@ license: |
| celeborn.client.rpc.cache.expireTime | 15s | false | The time before a cache
item is removed. | 0.3.0 | celeborn.rpc.cache.expireTime |
| celeborn.client.rpc.cache.size | 256 | false | The max cache items count for
rpc cache. | 0.3.0 | celeborn.rpc.cache.size |
| celeborn.client.rpc.commitFiles.askTimeout | <value of
celeborn.rpc.askTimeout> | false | Timeout for CommitHandler commit files. |
0.4.1 | |
-| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | false | Timeout for ask
operations during getting reducer file group information. During this process,
there are `celeborn.client.requestCommitFiles.maxRetries` times for retry
opportunities for committing files and 1 times for releasing slots request.
User can customize this value according to your setting. By default, the value
is the max timeout value `celeborn.<modul [...]
+| celeborn.client.rpc.getReducerFileGroup.askTimeout | <value of
celeborn.rpc.askTimeout> | false | Timeout for ask operations during getting
reducer file group information. During this process, there are
`celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities
for committing files and 1 times for releasing slots request. User can
customize this value according to your setting. | 0.2.0 | |
| celeborn.client.rpc.maxRetries | 3 | false | Max RPC retry times in
LifecycleManager. | 0.3.2 | |
-| celeborn.client.rpc.registerShuffle.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | false | Timeout for ask
operations during register shuffle. During this process, there are two times
for retry opportunities for requesting slots, one request for establishing a
connection with Worker and `celeborn.client.reserveSlots.maxRetries` times for
retry opportunities for reserving slots. User can customize this value
according to your setting. By default, the value [...]
-| celeborn.client.rpc.requestPartition.askTimeout | <value of
celeborn.<module>.io.connectionTimeout> | false | Timeout for ask
operations during requesting change partition location, such as reviving or
splitting partition. During this process, there are
`celeborn.client.reserveSlots.maxRetries` times for retry opportunities for
reserving slots. User can customize this value according to your setting. By
default, the value is the max timeout value `celeborn.<module>.io.conne [...]
+| celeborn.client.rpc.registerShuffle.askTimeout | <value of
celeborn.rpc.askTimeout> | false | Timeout for ask operations during
register shuffle. During this process, there are two times for retry
opportunities for requesting slots, one request for establishing a connection
with Worker and `celeborn.client.reserveSlots.maxRetries` times for retry
opportunities for reserving slots. User can customize this value according to
your setting. | 0.3.0 | celeborn.rpc.registerShuffle.askT [...]
+| celeborn.client.rpc.requestPartition.askTimeout | <value of
celeborn.rpc.askTimeout> | false | Timeout for ask operations during
requesting change partition location, such as reviving or splitting partition.
During this process, there are `celeborn.client.reserveSlots.maxRetries` times
for retry opportunities for reserving slots. User can customize this value
according to your setting. | 0.2.0 | |
| celeborn.client.rpc.reserveSlots.askTimeout | <value of
celeborn.rpc.askTimeout> | false | Timeout for LifecycleManager request
reserve slots. | 0.3.0 | |
| celeborn.client.rpc.shared.threads | 16 | false | Number of shared rpc
threads in LifecycleManager. | 0.3.2 | |
| celeborn.client.shuffle.batchHandleChangePartition.interval | 100ms | false
| Interval for LifecycleManager to schedule handling change partition requests
in batch. | 0.3.0 | celeborn.shuffle.batchHandleChangePartition.interval |
diff --git a/docs/migration.md b/docs/migration.md
index 3c3500a8b..0fbe9accf 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -45,6 +45,8 @@ license: |
- Since 0.4.1, Celeborn master adds a limit to the estimated partition size
used for computing worker slots.
This size is now constrained within the range specified by
`celeborn.master.estimatedPartitionSize.minSize` and
`celeborn.master.estimatedPartitionSize.maxSize`.
+- Since 0.4.1, Celeborn changed the fallback configuration of
`celeborn.client.rpc.getReducerFileGroup.askTimeout`,
`celeborn.client.rpc.registerShuffle.askTimeout` and
`celeborn.client.rpc.requestPartition.askTimeout` from
`celeborn.<module>.io.connectionTimeout` to `celeborn.rpc.askTimeout`.
+
## Upgrading from 0.3 to 0.4
- Since 0.4.0, Celeborn won't be compatible with Celeborn client that versions
below 0.3.0.