This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b1566ebd [IOTDB-3564] Reduce the number of I/O threads using thrift 
asynchronous server mode for MultiLeaderConsensusRPC (#6528)
77b1566ebd is described below

commit 77b1566ebde468eb912110afda89308067d944d9
Author: hejiarui2015 <[email protected]>
AuthorDate: Fri Jul 1 14:03:25 2022 +0800

    [IOTDB-3564] Reduce the number of I/O threads using thrift asynchronous 
server mode for MultiLeaderConsensusRPC (#6528)
---
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 46 ++++++++++++++++-
 .../multileader/MultiLeaderConsensus.java          |  2 +-
 .../multileader/service/MultiLeaderRPCService.java | 17 +++++--
 .../service/MultiLeaderRPCServiceProcessor.java    | 59 ++++++++++++----------
 .../service/AbstractThriftServiceThread.java       | 34 ++++++++-----
 .../iotdb/commons/service/ThriftServiceThread.java |  4 ++
 server/file-changelists/conf-changelist.md         |  3 ++
 .../resources/conf/iotdb-datanode.properties       |  6 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 22 ++++++++
 .../db/consensus/DataRegionConsensusImpl.java      |  4 ++
 11 files changed, 172 insertions(+), 47 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 119bd17b0a..06cc2e453c 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -66,23 +66,40 @@ public class MultiLeaderConfig {
   }
 
   public static class RPC {
+    private final int rpcSelectorThreadNum;
+    private final int rpcMinConcurrentClientNum;
     private final int rpcMaxConcurrentClientNum;
     private final int thriftServerAwaitTimeForStopService;
     private final boolean isRpcThriftCompressionEnabled;
     private final int selectorNumOfClientManager;
     private final int connectionTimeoutInMs;
+    private final int thriftMaxFrameSize;
 
     private RPC(
+        int rpcSelectorThreadNum,
+        int rpcMinConcurrentClientNum,
         int rpcMaxConcurrentClientNum,
         int thriftServerAwaitTimeForStopService,
         boolean isRpcThriftCompressionEnabled,
         int selectorNumOfClientManager,
-        int connectionTimeoutInMs) {
+        int connectionTimeoutInMs,
+        int thriftMaxFrameSize) {
+      this.rpcSelectorThreadNum = rpcSelectorThreadNum;
+      this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
       this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
       this.thriftServerAwaitTimeForStopService = 
thriftServerAwaitTimeForStopService;
       this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
       this.selectorNumOfClientManager = selectorNumOfClientManager;
       this.connectionTimeoutInMs = connectionTimeoutInMs;
+      this.thriftMaxFrameSize = thriftMaxFrameSize;
+    }
+
+    public int getRpcSelectorThreadNum() {
+      return rpcSelectorThreadNum;
+    }
+
+    public int getRpcMinConcurrentClientNum() {
+      return rpcMinConcurrentClientNum;
     }
 
     public int getRpcMaxConcurrentClientNum() {
@@ -105,16 +122,33 @@ public class MultiLeaderConfig {
       return connectionTimeoutInMs;
     }
 
+    public int getThriftMaxFrameSize() {
+      return thriftMaxFrameSize;
+    }
+
     public static RPC.Builder newBuilder() {
       return new RPC.Builder();
     }
 
     public static class Builder {
+      private int rpcSelectorThreadNum = 1;
+      private int rpcMinConcurrentClientNum = 
Runtime.getRuntime().availableProcessors();
       private int rpcMaxConcurrentClientNum = 65535;
       private int thriftServerAwaitTimeForStopService = 60;
       private boolean isRpcThriftCompressionEnabled = false;
       private int selectorNumOfClientManager = 1;
       private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
+      private int thriftMaxFrameSize = 536870912;
+
+      public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
+        this.rpcSelectorThreadNum = rpcSelectorThreadNum;
+        return this;
+      }
+
+      public RPC.Builder setRpcMinConcurrentClientNum(int 
rpcMinConcurrentClientNum) {
+        this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
+        return this;
+      }
 
       public RPC.Builder setRpcMaxConcurrentClientNum(int 
rpcMaxConcurrentClientNum) {
         this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
@@ -142,13 +176,21 @@ public class MultiLeaderConfig {
         return this;
       }
 
+      public RPC.Builder setThriftMaxFrameSize(int thriftMaxFrameSize) {
+        this.thriftMaxFrameSize = thriftMaxFrameSize;
+        return this;
+      }
+
       public RPC build() {
         return new RPC(
+            rpcSelectorThreadNum,
+            rpcMinConcurrentClientNum,
             rpcMaxConcurrentClientNum,
             thriftServerAwaitTimeForStopService,
             isRpcThriftCompressionEnabled,
             selectorNumOfClientManager,
-            connectionTimeoutInMs);
+            connectionTimeoutInMs,
+            thriftMaxFrameSize);
       }
     }
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index eecfa473c6..2ef4562f90 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -87,7 +87,7 @@ public class MultiLeaderConsensus implements IConsensus {
   @Override
   public void start() throws IOException {
     initAndRecover();
-    service.initSyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
+    service.initAsyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
     try {
       registerManager.register(service);
     } catch (StartupException e) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
index 64dadf2aea..c94892a8b5 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.commons.service.ThriftServiceThread;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import 
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
 
+import org.apache.thrift.TBaseAsyncProcessor;
+
 import java.lang.reflect.InvocationTargetException;
 
 public class MultiLeaderRPCService extends ThriftService implements 
MultiLeaderRPCServiceMBean {
@@ -48,20 +50,20 @@ public class MultiLeaderRPCService extends ThriftService 
implements MultiLeaderR
   }
 
   @Override
-  public void initSyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
+  public void initAsyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
     this.multiLeaderRPCServiceProcessor =
         (MultiLeaderRPCServiceProcessor) multiLeaderRPCServiceProcessor;
     super.mbeanName =
         String.format(
             "%s:%s=%s", this.getClass().getPackage(), IoTDBConstant.JMX_TYPE, 
getID().getJmxName());
-    super.initSyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
+    super.initAsyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
   }
 
   @Override
   public void initTProcessor()
       throws ClassNotFoundException, IllegalAccessException, 
InstantiationException,
           NoSuchMethodException, InvocationTargetException {
-    processor = new 
MultiLeaderConsensusIService.Processor<>(multiLeaderRPCServiceProcessor);
+    processor = new 
MultiLeaderConsensusIService.AsyncProcessor<>(multiLeaderRPCServiceProcessor);
   }
 
   @Override
@@ -70,15 +72,20 @@ public class MultiLeaderRPCService extends ThriftService 
implements MultiLeaderR
     try {
       thriftServiceThread =
           new ThriftServiceThread(
-              processor,
+              (TBaseAsyncProcessor) processor,
               getID().getName(),
               ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
               getBindIP(),
               getBindPort(),
+              config.getRpc().getRpcSelectorThreadNum(),
+              config.getRpc().getRpcMinConcurrentClientNum(),
               config.getRpc().getRpcMaxConcurrentClientNum(),
               config.getRpc().getThriftServerAwaitTimeForStopService(),
               new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
-              config.getRpc().isRpcThriftCompressionEnabled());
+              config.getRpc().isRpcThriftCompressionEnabled(),
+              config.getRpc().getConnectionTimeoutInMs(),
+              config.getRpc().getThriftMaxFrameSize(),
+              ThriftServiceThread.ServerType.SELECTOR);
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index d829f403d0..196bf120ff 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIService.Iface {
+public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIService.AsyncIface {
 
   private final Logger logger = 
LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
 
@@ -49,33 +50,39 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
   }
 
   @Override
-  public TSyncLogRes syncLog(TSyncLogReq req) throws TException {
-    ConsensusGroupId groupId =
-        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
-    if (impl == null) {
-      String message =
-          String.format(
-              "Unexpected consensusGroupId %s for TSyncLogReq which size is 
%s",
-              groupId, req.getBatches().size());
-      logger.error(message);
-      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-      status.setMessage(message);
-      return new TSyncLogRes(Collections.singletonList(status));
-    }
-    List<TSStatus> statuses = new ArrayList<>();
-    // We use synchronized to ensure atomicity of executing multiple logs
-    synchronized (impl.getStateMachine()) {
-      for (TLogBatch batch : req.getBatches()) {
-        statuses.add(
-            impl.getStateMachine()
-                .write(
-                    impl.buildIndexedConsensusRequestForRemoteRequest(
-                        new ByteBufferConsensusRequest(batch.data))));
+  public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> 
resultHandler)
+      throws TException {
+    try {
+      ConsensusGroupId groupId =
+          
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+      MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+      if (impl == null) {
+        String message =
+            String.format(
+                "Unexpected consensusGroupId %s for TSyncLogReq which size is 
%s",
+                groupId, req.getBatches().size());
+        logger.error(message);
+        TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+        status.setMessage(message);
+        resultHandler.onComplete(new 
TSyncLogRes(Collections.singletonList(status)));
+        return;
+      }
+      List<TSStatus> statuses = new ArrayList<>();
+      // We use synchronized to ensure atomicity of executing multiple logs
+      synchronized (impl.getStateMachine()) {
+        for (TLogBatch batch : req.getBatches()) {
+          statuses.add(
+              impl.getStateMachine()
+                  .write(
+                      impl.buildIndexedConsensusRequestForRemoteRequest(
+                          new ByteBufferConsensusRequest(batch.data))));
+        }
       }
+      logger.debug("Execute TSyncLogReq for {} with result {}", 
req.consensusGroupId, statuses);
+      resultHandler.onComplete(new TSyncLogRes(statuses));
+    } catch (Exception e) {
+      resultHandler.onError(e);
     }
-    logger.debug("Execute TSyncLogReq for {} with result {}", 
req.consensusGroupId, statuses);
-    return new TSyncLogRes(statuses);
   }
 
   public void handleClientExit() {}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index 3ffadfc07f..86e581af91 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -103,6 +103,8 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
       String threadsName,
       String bindAddress,
       int port,
+      int selectorThreads,
+      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       TServerEventHandler serverEventHandler,
@@ -118,13 +120,24 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
         case SELECTOR:
           TThreadedSelectorServer.Args poolArgs =
               initAsyncedSelectorPoolArgs(
-                  processor, threadsName, maxWorkerThreads, timeoutSecond, 
maxReadBufferBytes);
+                  processor,
+                  threadsName,
+                  selectorThreads,
+                  minWorkerThreads,
+                  maxWorkerThreads,
+                  timeoutSecond,
+                  maxReadBufferBytes);
           poolServer = new TThreadedSelectorServer(poolArgs);
           break;
         case HSHA:
           THsHaServer.Args poolArgs1 =
               initAsyncedHshaPoolArgs(
-                  processor, threadsName, maxWorkerThreads, timeoutSecond, 
maxReadBufferBytes);
+                  processor,
+                  threadsName,
+                  minWorkerThreads,
+                  maxWorkerThreads,
+                  timeoutSecond,
+                  maxReadBufferBytes);
           poolServer = new THsHaServer(poolArgs1);
           break;
         default:
@@ -180,20 +193,18 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
   private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs(
       TBaseAsyncProcessor processor,
       String threadsName,
+      int selectorThreads,
+      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       int maxReadBufferBytes) {
     TThreadedSelectorServer.Args poolArgs =
         new TThreadedSelectorServer.Args((TNonblockingServerTransport) 
serverTransport);
     poolArgs.maxReadBufferBytes = maxReadBufferBytes;
-    poolArgs.selectorThreads(Runtime.getRuntime().availableProcessors());
+    poolArgs.selectorThreads(selectorThreads);
     poolArgs.executorService(
         IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
-            Runtime.getRuntime().availableProcessors(),
-            maxWorkerThreads,
-            timeoutSecond,
-            TimeUnit.SECONDS,
-            threadsName));
+            minWorkerThreads, maxWorkerThreads, timeoutSecond, 
TimeUnit.SECONDS, threadsName));
     poolArgs.processor(processor);
     poolArgs.protocolFactory(protocolFactory);
     poolArgs.transportFactory(getTTransportFactory());
@@ -203,6 +214,7 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
   private THsHaServer.Args initAsyncedHshaPoolArgs(
       TBaseAsyncProcessor processor,
       String threadsName,
+      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       int maxReadBufferBytes) {
@@ -210,11 +222,7 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
     poolArgs.maxReadBufferBytes = maxReadBufferBytes;
     poolArgs.executorService(
         IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
-            Runtime.getRuntime().availableProcessors(),
-            maxWorkerThreads,
-            timeoutSecond,
-            TimeUnit.SECONDS,
-            threadsName));
+            minWorkerThreads, maxWorkerThreads, timeoutSecond, 
TimeUnit.SECONDS, threadsName));
     poolArgs.processor(processor);
     poolArgs.protocolFactory(protocolFactory);
     poolArgs.transportFactory(getTTransportFactory());
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index 365d7e6db0..5ce1746799 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -36,6 +36,8 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
       String threadsName,
       String bindAddress,
       int port,
+      int selectorThreads,
+      int minWorkerThreads,
       int maxWorkerThreads,
       int timeoutSecond,
       TServerEventHandler serverEventHandler,
@@ -49,6 +51,8 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
         threadsName,
         bindAddress,
         port,
+        selectorThreads,
+        minWorkerThreads,
         maxWorkerThreads,
         timeoutSecond,
         serverEventHandler,
diff --git a/server/file-changelists/conf-changelist.md 
b/server/file-changelists/conf-changelist.md
index 645fe931b8..b5a09f57a7 100644
--- a/server/file-changelists/conf-changelist.md
+++ b/server/file-changelists/conf-changelist.md
@@ -85,6 +85,9 @@ mtree_snapshot_interval=100000
 
 mtree_snapshot_threshold_time=3600
 
+rpc_selector_thread_num=1
+
+rpc_min_concurrent_client_num=1
 
 ## remove:
 merge_thread_num=1
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties 
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 5f7eb7c0a1..65355f51ad 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -76,6 +76,12 @@ target_config_nodes=127.0.0.1:22277
 # this feature is under development, set this as false before it is done.
 # rpc_advanced_compression_enable=false
 
+# Datatype: int
+# rpc_selector_thread_num=1
+
+# Datatype: int
+# rpc_min_concurrent_client_num=1
+
 # Datatype: int
 # rpc_max_concurrent_client_num=65535
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e8470e94ef..a2c8ce8faa 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -114,6 +114,12 @@ public class IoTDBConfig {
   /** Port which the influxdb protocol server listens to. */
   private int influxDBRpcPort = 8086;
 
+  /** Rpc Selector thread num */
+  private int rpcSelectorThreadNum = 1;
+
+  /** Min concurrent client number */
+  private int rpcMinConcurrentClientNum = 
Runtime.getRuntime().availableProcessors();
+
   /** Max concurrent client number */
   private int rpcMaxConcurrentClientNum = 65535;
 
@@ -1383,6 +1389,22 @@ public class IoTDBConfig {
     this.unSeqTsFileSize = unSeqTsFileSize;
   }
 
+  public int getRpcSelectorThreadNum() {
+    return rpcSelectorThreadNum;
+  }
+
+  public void setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
+    this.rpcSelectorThreadNum = rpcSelectorThreadNum;
+  }
+
+  public int getRpcMinConcurrentClientNum() {
+    return rpcMinConcurrentClientNum;
+  }
+
+  public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) {
+    this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
+  }
+
   public int getRpcMaxConcurrentClientNum() {
     return rpcMaxConcurrentClientNum;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8668ab986a..a25d6674d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -602,6 +602,28 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "enable_partial_insert", 
String.valueOf(conf.isEnablePartialInsert()))));
 
+      int rpcSelectorThreadNum =
+          Integer.parseInt(
+              properties.getProperty(
+                  "rpc_selector_thread_num",
+                  Integer.toString(conf.getRpcSelectorThreadNum()).trim()));
+      if (rpcSelectorThreadNum <= 0) {
+        rpcSelectorThreadNum = 1;
+      }
+
+      conf.setRpcSelectorThreadNum(rpcSelectorThreadNum);
+
+      int minConcurrentClientNum =
+          Integer.parseInt(
+              properties.getProperty(
+                  "rpc_min_concurrent_client_num",
+                  
Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
+      if (minConcurrentClientNum <= 0) {
+        minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
+      }
+
+      conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
+
       int maxConcurrentClientNum =
           Integer.parseInt(
               properties.getProperty(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 08e3e74ef1..8e2d726ac6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -64,6 +64,9 @@ public class DataRegionConsensusImpl {
                               .setRpc(
                                   RPC.newBuilder()
                                       
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
+                                      
.setRpcSelectorThreadNum(conf.getRpcSelectorThreadNum())
+                                      .setRpcMinConcurrentClientNum(
+                                          conf.getRpcMinConcurrentClientNum())
                                       .setRpcMaxConcurrentClientNum(
                                           conf.getRpcMaxConcurrentClientNum())
                                       .setRpcThriftCompressionEnabled(
@@ -72,6 +75,7 @@ public class DataRegionConsensusImpl {
                                           conf.getSelectorNumOfClientManager())
                                       .setThriftServerAwaitTimeForStopService(
                                           
conf.getThriftServerAwaitTimeForStopService())
+                                      
.setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
                                       .build())
                               .build())
                       .setRatisConfig(

Reply via email to