This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 69defcad7 [CELEBORN-1021] Celeborn support arbitary Ratis configs and
client rpc timeout
69defcad7 is described below
commit 69defcad7f9423c9c24d2d22ead856b4225671c6
Author: mingji <[email protected]>
AuthorDate: Wed Oct 18 10:26:11 2023 +0800
[CELEBORN-1021] Celeborn support arbitary Ratis configs and client rpc
timeout
### What changes were proposed in this pull request?
1. To support arbitrary Ratis configs
2. To support Ratis client rpc timeout
### Why are the changes needed?
After some digs that I found out Celeborn never changed the default config
of ratis client's timeout.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes #1969 from FMX/CELEBORN-1021.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 21 +++++++++++++++++++++
docs/migration.md | 6 ++++++
.../deploy/master/clustermeta/ha/HARaftServer.java | 13 +++++++++++++
3 files changed, 40 insertions(+)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index aa39db44f..877a2559e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -614,6 +614,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def haMasterRatisRetryCacheExpiryTime: Long =
get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME)
def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN)
def haMasterRatisRpcTimeoutMax: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MAX)
+ def haMasterRatisClientRpcTimeout: Long =
get(HA_MASTER_RATIS_CLIENT_RPC_TIMEOUT)
+ def haMasterRatisClientRpcWatchTimeout: Long =
get(HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT)
def haMasterRatisFirstElectionTimeoutMin: Long =
get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN)
def haMasterRatisFirstElectionTimeoutMax: Long =
get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX)
def haMasterRatisNotificationNoLeaderTimeout: Long =
@@ -625,6 +627,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def haMasterRatisSnapshotAutoTriggerThreshold: Long =
get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD)
def haMasterRatisSnapshotRetentionFileNum: Int =
get(HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM)
+ def haRatisCustomConfigs: JMap[String, String] = {
+ settings.asScala.filter(_._1.startsWith("celeborn.ratis")).toMap.asJava
+ }
// //////////////////////////////////////////////////////
// Worker //
@@ -1807,6 +1812,22 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")
+ val HA_MASTER_RATIS_CLIENT_RPC_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.master.ha.ratis.raft.client.rpc.timeout")
+ .internal
+ .categories("ha")
+ .version("0.3.2")
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefaultString("10s")
+
+ val HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.master.ha.ratis.raft.client.rpc.watch.timeout")
+ .internal
+ .categories("ha")
+ .version("0.3.2")
+ .timeConf(TimeUnit.SECONDS)
+ .createWithDefaultString("20s")
+
val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN: ConfigEntry[Long] =
buildConf("celeborn.master.ha.ratis.first.election.timeout.min")
.withAlternative("celeborn.ha.master.ratis.first.election.timeout.min")
diff --git a/docs/migration.md b/docs/migration.md
index faaff3d09..62320b997 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -42,6 +42,12 @@ license: |
- Since 0.4.0, Celeborn deprecate `celeborn.worker.storage.baseDir.prefix` and
`celeborn.worker.storage.baseDir.number`.
Please use `celeborn.worker.storage.dirs` instead.
+## Upgrading from 0.3.1 to 0.3.2
+
+- Since 0.3.1, Celeborn changed the default value of
`raft.client.rpc.request.timeout` from `3s` to `10s`.
+
+- Since 0.3.1, Celeborn changed the default value of
`raft.client.rpc.watch.request.timeout` from `10s` to `20s`.
+
## Upgrading from 0.3.0 to 0.3.1
- Since 0.3.1, Celeborn changed the default value of
`celeborn.worker.directMemoryRatioToResume` from `0.5` to `0.7`.
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index 313ee2140..4c2573955 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -32,6 +32,7 @@ import scala.Tuple2;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
@@ -324,6 +325,14 @@ public class HARaftServer {
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(properties,
firstElectionTimeoutMin);
RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(properties,
firstElectionTimeoutMax);
+ // Set the rpc client timeout
+ TimeDuration clientRpcTimeout =
+ TimeDuration.valueOf(conf.haMasterRatisClientRpcTimeout(),
TimeUnit.SECONDS);
+ TimeDuration clientRpcWatchTimeout =
+ TimeDuration.valueOf(conf.haMasterRatisClientRpcWatchTimeout(),
TimeUnit.SECONDS);
+ RaftClientConfigKeys.Rpc.setRequestTimeout(properties, clientRpcTimeout);
+ RaftClientConfigKeys.Rpc.setWatchRequestTimeout(properties,
clientRpcWatchTimeout);
+
// Set the number of maximum cached segments
RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
@@ -348,6 +357,10 @@ public class HARaftServer {
long snapshotAutoTriggerThreshold =
conf.haMasterRatisSnapshotAutoTriggerThreshold();
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties,
snapshotAutoTriggerThreshold);
+ for (Map.Entry<String, String> ratisEntry :
conf.haRatisCustomConfigs().entrySet()) {
+ properties.set(ratisEntry.getKey().replace("celeborn.ratis.", ""),
ratisEntry.getValue());
+ }
+
return properties;
}