This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 69defcad7 [CELEBORN-1021] Celeborn support arbitary Ratis configs and 
client rpc timeout
69defcad7 is described below

commit 69defcad7f9423c9c24d2d22ead856b4225671c6
Author: mingji <[email protected]>
AuthorDate: Wed Oct 18 10:26:11 2023 +0800

    [CELEBORN-1021] Celeborn support arbitary Ratis configs and client rpc 
timeout
    
    ### What changes were proposed in this pull request?
    1. To support arbitrary Ratis configs
    2. To support Ratis client rpc timeout
    
    ### Why are the changes needed?
    After some digs that I found out Celeborn never changed the default config 
of ratis client's timeout.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA and cluster.
    
    Closes #1969 from FMX/CELEBORN-1021.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala   | 21 +++++++++++++++++++++
 docs/migration.md                                   |  6 ++++++
 .../deploy/master/clustermeta/ha/HARaftServer.java  | 13 +++++++++++++
 3 files changed, 40 insertions(+)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index aa39db44f..877a2559e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -614,6 +614,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def haMasterRatisRetryCacheExpiryTime: Long = 
get(HA_MASTER_RATIS_SERVER_RETRY_CACHE_EXPIRY_TIME)
   def haMasterRatisRpcTimeoutMin: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MIN)
   def haMasterRatisRpcTimeoutMax: Long = get(HA_MASTER_RATIS_RPC_TIMEOUT_MAX)
+  def haMasterRatisClientRpcTimeout: Long = 
get(HA_MASTER_RATIS_CLIENT_RPC_TIMEOUT)
+  def haMasterRatisClientRpcWatchTimeout: Long = 
get(HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT)
   def haMasterRatisFirstElectionTimeoutMin: Long = 
get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN)
   def haMasterRatisFirstElectionTimeoutMax: Long = 
get(HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MAX)
   def haMasterRatisNotificationNoLeaderTimeout: Long =
@@ -625,6 +627,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def haMasterRatisSnapshotAutoTriggerThreshold: Long =
     get(HA_MASTER_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD)
   def haMasterRatisSnapshotRetentionFileNum: Int = 
get(HA_MASTER_RATIS_SNAPSHOT_RETENTION_FILE_NUM)
+  def haRatisCustomConfigs: JMap[String, String] = {
+    settings.asScala.filter(_._1.startsWith("celeborn.ratis")).toMap.asJava
+  }
 
   // //////////////////////////////////////////////////////
   //                      Worker                         //
@@ -1807,6 +1812,22 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefaultString("5s")
 
+  val HA_MASTER_RATIS_CLIENT_RPC_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.master.ha.ratis.raft.client.rpc.timeout")
+      .internal
+      .categories("ha")
+      .version("0.3.2")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("10s")
+
+  val HA_MASTER_RATIS_CLIENT_RPC_WATCH_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.master.ha.ratis.raft.client.rpc.watch.timeout")
+      .internal
+      .categories("ha")
+      .version("0.3.2")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("20s")
+
   val HA_MASTER_RATIS_FIRSTELECTION_TIMEOUT_MIN: ConfigEntry[Long] =
     buildConf("celeborn.master.ha.ratis.first.election.timeout.min")
       .withAlternative("celeborn.ha.master.ratis.first.election.timeout.min")
diff --git a/docs/migration.md b/docs/migration.md
index faaff3d09..62320b997 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -42,6 +42,12 @@ license: |
 - Since 0.4.0, Celeborn deprecate `celeborn.worker.storage.baseDir.prefix` and 
`celeborn.worker.storage.baseDir.number`.
   Please use `celeborn.worker.storage.dirs` instead.
 
+## Upgrading from 0.3.1 to 0.3.2
+
+- Since 0.3.1, Celeborn changed the default value of 
`raft.client.rpc.request.timeout` from `3s` to `10s`.
+
+- Since 0.3.1, Celeborn changed the default value of 
`raft.client.rpc.watch.request.timeout` from `10s` to `20s`.
+
 ## Upgrading from 0.3.0 to 0.3.1
 
 - Since 0.3.1, Celeborn changed the default value of 
`celeborn.worker.directMemoryRatioToResume` from `0.5` to `0.7`.
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
index 313ee2140..4c2573955 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java
@@ -32,6 +32,7 @@ import scala.Tuple2;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.netty.NettyConfigKeys;
@@ -324,6 +325,14 @@ public class HARaftServer {
     RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(properties, 
firstElectionTimeoutMin);
     RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(properties, 
firstElectionTimeoutMax);
 
+    // Set the rpc client timeout
+    TimeDuration clientRpcTimeout =
+        TimeDuration.valueOf(conf.haMasterRatisClientRpcTimeout(), 
TimeUnit.SECONDS);
+    TimeDuration clientRpcWatchTimeout =
+        TimeDuration.valueOf(conf.haMasterRatisClientRpcWatchTimeout(), 
TimeUnit.SECONDS);
+    RaftClientConfigKeys.Rpc.setRequestTimeout(properties, clientRpcTimeout);
+    RaftClientConfigKeys.Rpc.setWatchRequestTimeout(properties, 
clientRpcWatchTimeout);
+
     // Set the number of maximum cached segments
     RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
 
@@ -348,6 +357,10 @@ public class HARaftServer {
     long snapshotAutoTriggerThreshold = 
conf.haMasterRatisSnapshotAutoTriggerThreshold();
     RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 
snapshotAutoTriggerThreshold);
 
+    for (Map.Entry<String, String> ratisEntry : 
conf.haRatisCustomConfigs().entrySet()) {
+      properties.set(ratisEntry.getKey().replace("celeborn.ratis.", ""), 
ratisEntry.getValue());
+    }
+
     return properties;
   }
 

Reply via email to