This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 77b1566ebd [IOTDB-3564] Reduce the number of I/O threads using thrift
asynchronous server mode for MultiLeaderConsensusRPC (#6528)
77b1566ebd is described below
commit 77b1566ebde468eb912110afda89308067d944d9
Author: hejiarui2015 <[email protected]>
AuthorDate: Fri Jul 1 14:03:25 2022 +0800
[IOTDB-3564] Reduce the number of I/O threads using thrift asynchronous
server mode for MultiLeaderConsensusRPC (#6528)
---
.../iotdb/consensus/config/MultiLeaderConfig.java | 46 ++++++++++++++++-
.../multileader/MultiLeaderConsensus.java | 2 +-
.../multileader/service/MultiLeaderRPCService.java | 17 +++++--
.../service/MultiLeaderRPCServiceProcessor.java | 59 ++++++++++++----------
.../service/AbstractThriftServiceThread.java | 34 ++++++++-----
.../iotdb/commons/service/ThriftServiceThread.java | 4 ++
server/file-changelists/conf-changelist.md | 3 ++
.../resources/conf/iotdb-datanode.properties | 6 +++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 22 ++++++++
.../db/consensus/DataRegionConsensusImpl.java | 4 ++
11 files changed, 172 insertions(+), 47 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 119bd17b0a..06cc2e453c 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -66,23 +66,40 @@ public class MultiLeaderConfig {
}
public static class RPC {
+ private final int rpcSelectorThreadNum;
+ private final int rpcMinConcurrentClientNum;
private final int rpcMaxConcurrentClientNum;
private final int thriftServerAwaitTimeForStopService;
private final boolean isRpcThriftCompressionEnabled;
private final int selectorNumOfClientManager;
private final int connectionTimeoutInMs;
+ private final int thriftMaxFrameSize;
private RPC(
+ int rpcSelectorThreadNum,
+ int rpcMinConcurrentClientNum,
int rpcMaxConcurrentClientNum,
int thriftServerAwaitTimeForStopService,
boolean isRpcThriftCompressionEnabled,
int selectorNumOfClientManager,
- int connectionTimeoutInMs) {
+ int connectionTimeoutInMs,
+ int thriftMaxFrameSize) {
+ this.rpcSelectorThreadNum = rpcSelectorThreadNum;
+ this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
this.thriftServerAwaitTimeForStopService =
thriftServerAwaitTimeForStopService;
this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
this.selectorNumOfClientManager = selectorNumOfClientManager;
this.connectionTimeoutInMs = connectionTimeoutInMs;
+ this.thriftMaxFrameSize = thriftMaxFrameSize;
+ }
+
+ public int getRpcSelectorThreadNum() {
+ return rpcSelectorThreadNum;
+ }
+
+ public int getRpcMinConcurrentClientNum() {
+ return rpcMinConcurrentClientNum;
}
public int getRpcMaxConcurrentClientNum() {
@@ -105,16 +122,33 @@ public class MultiLeaderConfig {
return connectionTimeoutInMs;
}
+ public int getThriftMaxFrameSize() {
+ return thriftMaxFrameSize;
+ }
+
public static RPC.Builder newBuilder() {
return new RPC.Builder();
}
public static class Builder {
+ private int rpcSelectorThreadNum = 1;
+ private int rpcMinConcurrentClientNum =
Runtime.getRuntime().availableProcessors();
private int rpcMaxConcurrentClientNum = 65535;
private int thriftServerAwaitTimeForStopService = 60;
private boolean isRpcThriftCompressionEnabled = false;
private int selectorNumOfClientManager = 1;
private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
+ private int thriftMaxFrameSize = 536870912;
+
+ public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
+ this.rpcSelectorThreadNum = rpcSelectorThreadNum;
+ return this;
+ }
+
+ public RPC.Builder setRpcMinConcurrentClientNum(int
rpcMinConcurrentClientNum) {
+ this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
+ return this;
+ }
public RPC.Builder setRpcMaxConcurrentClientNum(int
rpcMaxConcurrentClientNum) {
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
@@ -142,13 +176,21 @@ public class MultiLeaderConfig {
return this;
}
+ public RPC.Builder setThriftMaxFrameSize(int thriftMaxFrameSize) {
+ this.thriftMaxFrameSize = thriftMaxFrameSize;
+ return this;
+ }
+
public RPC build() {
return new RPC(
+ rpcSelectorThreadNum,
+ rpcMinConcurrentClientNum,
rpcMaxConcurrentClientNum,
thriftServerAwaitTimeForStopService,
isRpcThriftCompressionEnabled,
selectorNumOfClientManager,
- connectionTimeoutInMs);
+ connectionTimeoutInMs,
+ thriftMaxFrameSize);
}
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index eecfa473c6..2ef4562f90 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -87,7 +87,7 @@ public class MultiLeaderConsensus implements IConsensus {
@Override
public void start() throws IOException {
initAndRecover();
- service.initSyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
+ service.initAsyncedServiceImpl(new MultiLeaderRPCServiceProcessor(this));
try {
registerManager.register(service);
} catch (StartupException e) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
index 64dadf2aea..c94892a8b5 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCService.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.commons.service.ThriftServiceThread;
import org.apache.iotdb.consensus.config.MultiLeaderConfig;
import
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
+import org.apache.thrift.TBaseAsyncProcessor;
+
import java.lang.reflect.InvocationTargetException;
public class MultiLeaderRPCService extends ThriftService implements
MultiLeaderRPCServiceMBean {
@@ -48,20 +50,20 @@ public class MultiLeaderRPCService extends ThriftService
implements MultiLeaderR
}
@Override
- public void initSyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
+ public void initAsyncedServiceImpl(Object multiLeaderRPCServiceProcessor) {
this.multiLeaderRPCServiceProcessor =
(MultiLeaderRPCServiceProcessor) multiLeaderRPCServiceProcessor;
super.mbeanName =
String.format(
"%s:%s=%s", this.getClass().getPackage(), IoTDBConstant.JMX_TYPE,
getID().getJmxName());
- super.initSyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
+ super.initAsyncedServiceImpl(this.multiLeaderRPCServiceProcessor);
}
@Override
public void initTProcessor()
throws ClassNotFoundException, IllegalAccessException,
InstantiationException,
NoSuchMethodException, InvocationTargetException {
- processor = new
MultiLeaderConsensusIService.Processor<>(multiLeaderRPCServiceProcessor);
+ processor = new
MultiLeaderConsensusIService.AsyncProcessor<>(multiLeaderRPCServiceProcessor);
}
@Override
@@ -70,15 +72,20 @@ public class MultiLeaderRPCService extends ThriftService
implements MultiLeaderR
try {
thriftServiceThread =
new ThriftServiceThread(
- processor,
+ (TBaseAsyncProcessor) processor,
getID().getName(),
ThreadName.MULTI_LEADER_CONSENSUS_RPC_CLIENT.getName(),
getBindIP(),
getBindPort(),
+ config.getRpc().getRpcSelectorThreadNum(),
+ config.getRpc().getRpcMinConcurrentClientNum(),
config.getRpc().getRpcMaxConcurrentClientNum(),
config.getRpc().getThriftServerAwaitTimeForStopService(),
new MultiLeaderRPCServiceHandler(multiLeaderRPCServiceProcessor),
- config.getRpc().isRpcThriftCompressionEnabled());
+ config.getRpc().isRpcThriftCompressionEnabled(),
+ config.getRpc().getConnectionTimeoutInMs(),
+ config.getRpc().getThriftMaxFrameSize(),
+ ThriftServiceThread.ServerType.SELECTOR);
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index d829f403d0..196bf120ff 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIService.Iface {
+public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIService.AsyncIface {
private final Logger logger =
LoggerFactory.getLogger(MultiLeaderRPCServiceProcessor.class);
@@ -49,33 +50,39 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
}
@Override
- public TSyncLogRes syncLog(TSyncLogReq req) throws TException {
- ConsensusGroupId groupId =
-
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
- if (impl == null) {
- String message =
- String.format(
- "Unexpected consensusGroupId %s for TSyncLogReq which size is
%s",
- groupId, req.getBatches().size());
- logger.error(message);
- TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- status.setMessage(message);
- return new TSyncLogRes(Collections.singletonList(status));
- }
- List<TSStatus> statuses = new ArrayList<>();
- // We use synchronized to ensure atomicity of executing multiple logs
- synchronized (impl.getStateMachine()) {
- for (TLogBatch batch : req.getBatches()) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- new ByteBufferConsensusRequest(batch.data))));
+ public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes>
resultHandler)
+ throws TException {
+ try {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "Unexpected consensusGroupId %s for TSyncLogReq which size is
%s",
+ groupId, req.getBatches().size());
+ logger.error(message);
+ TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ status.setMessage(message);
+ resultHandler.onComplete(new
TSyncLogRes(Collections.singletonList(status)));
+ return;
+ }
+ List<TSStatus> statuses = new ArrayList<>();
+ // We use synchronized to ensure atomicity of executing multiple logs
+ synchronized (impl.getStateMachine()) {
+ for (TLogBatch batch : req.getBatches()) {
+ statuses.add(
+ impl.getStateMachine()
+ .write(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ new ByteBufferConsensusRequest(batch.data))));
+ }
}
+ logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId, statuses);
+ resultHandler.onComplete(new TSyncLogRes(statuses));
+ } catch (Exception e) {
+ resultHandler.onError(e);
}
- logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId, statuses);
- return new TSyncLogRes(statuses);
}
public void handleClientExit() {}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
b/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index 3ffadfc07f..86e581af91 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -103,6 +103,8 @@ public abstract class AbstractThriftServiceThread extends
Thread {
String threadsName,
String bindAddress,
int port,
+ int selectorThreads,
+ int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
TServerEventHandler serverEventHandler,
@@ -118,13 +120,24 @@ public abstract class AbstractThriftServiceThread extends
Thread {
case SELECTOR:
TThreadedSelectorServer.Args poolArgs =
initAsyncedSelectorPoolArgs(
- processor, threadsName, maxWorkerThreads, timeoutSecond,
maxReadBufferBytes);
+ processor,
+ threadsName,
+ selectorThreads,
+ minWorkerThreads,
+ maxWorkerThreads,
+ timeoutSecond,
+ maxReadBufferBytes);
poolServer = new TThreadedSelectorServer(poolArgs);
break;
case HSHA:
THsHaServer.Args poolArgs1 =
initAsyncedHshaPoolArgs(
- processor, threadsName, maxWorkerThreads, timeoutSecond,
maxReadBufferBytes);
+ processor,
+ threadsName,
+ minWorkerThreads,
+ maxWorkerThreads,
+ timeoutSecond,
+ maxReadBufferBytes);
poolServer = new THsHaServer(poolArgs1);
break;
default:
@@ -180,20 +193,18 @@ public abstract class AbstractThriftServiceThread extends
Thread {
private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs(
TBaseAsyncProcessor processor,
String threadsName,
+ int selectorThreads,
+ int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
int maxReadBufferBytes) {
TThreadedSelectorServer.Args poolArgs =
new TThreadedSelectorServer.Args((TNonblockingServerTransport)
serverTransport);
poolArgs.maxReadBufferBytes = maxReadBufferBytes;
- poolArgs.selectorThreads(Runtime.getRuntime().availableProcessors());
+ poolArgs.selectorThreads(selectorThreads);
poolArgs.executorService(
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
- Runtime.getRuntime().availableProcessors(),
- maxWorkerThreads,
- timeoutSecond,
- TimeUnit.SECONDS,
- threadsName));
+ minWorkerThreads, maxWorkerThreads, timeoutSecond,
TimeUnit.SECONDS, threadsName));
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolArgs.transportFactory(getTTransportFactory());
@@ -203,6 +214,7 @@ public abstract class AbstractThriftServiceThread extends
Thread {
private THsHaServer.Args initAsyncedHshaPoolArgs(
TBaseAsyncProcessor processor,
String threadsName,
+ int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
int maxReadBufferBytes) {
@@ -210,11 +222,7 @@ public abstract class AbstractThriftServiceThread extends
Thread {
poolArgs.maxReadBufferBytes = maxReadBufferBytes;
poolArgs.executorService(
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
- Runtime.getRuntime().availableProcessors(),
- maxWorkerThreads,
- timeoutSecond,
- TimeUnit.SECONDS,
- threadsName));
+ minWorkerThreads, maxWorkerThreads, timeoutSecond,
TimeUnit.SECONDS, threadsName));
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolArgs.transportFactory(getTTransportFactory());
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index 365d7e6db0..5ce1746799 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -36,6 +36,8 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
String threadsName,
String bindAddress,
int port,
+ int selectorThreads,
+ int minWorkerThreads,
int maxWorkerThreads,
int timeoutSecond,
TServerEventHandler serverEventHandler,
@@ -49,6 +51,8 @@ public class ThriftServiceThread extends
AbstractThriftServiceThread {
threadsName,
bindAddress,
port,
+ selectorThreads,
+ minWorkerThreads,
maxWorkerThreads,
timeoutSecond,
serverEventHandler,
diff --git a/server/file-changelists/conf-changelist.md
b/server/file-changelists/conf-changelist.md
index 645fe931b8..b5a09f57a7 100644
--- a/server/file-changelists/conf-changelist.md
+++ b/server/file-changelists/conf-changelist.md
@@ -85,6 +85,9 @@ mtree_snapshot_interval=100000
mtree_snapshot_threshold_time=3600
+rpc_selector_thread_num=1
+
+rpc_min_concurrent_client_num=1
## removeļ¼
merge_thread_num=1
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 5f7eb7c0a1..65355f51ad 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -76,6 +76,12 @@ target_config_nodes=127.0.0.1:22277
# this feature is under development, set this as false before it is done.
# rpc_advanced_compression_enable=false
+# Datatype: int
+# rpc_selector_thread_num=1
+
+# Datatype: int
+# rpc_min_concurrent_client_num=1
+
# Datatype: int
# rpc_max_concurrent_client_num=65535
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e8470e94ef..a2c8ce8faa 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -114,6 +114,12 @@ public class IoTDBConfig {
/** Port which the influxdb protocol server listens to. */
private int influxDBRpcPort = 8086;
+ /** Rpc Selector thread num */
+ private int rpcSelectorThreadNum = 1;
+
+ /** Min concurrent client number */
+ private int rpcMinConcurrentClientNum =
Runtime.getRuntime().availableProcessors();
+
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 65535;
@@ -1383,6 +1389,22 @@ public class IoTDBConfig {
this.unSeqTsFileSize = unSeqTsFileSize;
}
+ public int getRpcSelectorThreadNum() {
+ return rpcSelectorThreadNum;
+ }
+
+ public void setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
+ this.rpcSelectorThreadNum = rpcSelectorThreadNum;
+ }
+
+ public int getRpcMinConcurrentClientNum() {
+ return rpcMinConcurrentClientNum;
+ }
+
+ public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) {
+ this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
+ }
+
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8668ab986a..a25d6674d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -602,6 +602,28 @@ public class IoTDBDescriptor {
properties.getProperty(
"enable_partial_insert",
String.valueOf(conf.isEnablePartialInsert()))));
+ int rpcSelectorThreadNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "rpc_selector_thread_num",
+ Integer.toString(conf.getRpcSelectorThreadNum()).trim()));
+ if (rpcSelectorThreadNum <= 0) {
+ rpcSelectorThreadNum = 1;
+ }
+
+ conf.setRpcSelectorThreadNum(rpcSelectorThreadNum);
+
+ int minConcurrentClientNum =
+ Integer.parseInt(
+ properties.getProperty(
+ "rpc_min_concurrent_client_num",
+
Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
+ if (minConcurrentClientNum <= 0) {
+ minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
+ }
+
+ conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
+
int maxConcurrentClientNum =
Integer.parseInt(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 08e3e74ef1..8e2d726ac6 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -64,6 +64,9 @@ public class DataRegionConsensusImpl {
.setRpc(
RPC.newBuilder()
.setConnectionTimeoutInMs(conf.getConnectionTimeoutInMS())
+
.setRpcSelectorThreadNum(conf.getRpcSelectorThreadNum())
+ .setRpcMinConcurrentClientNum(
+ conf.getRpcMinConcurrentClientNum())
.setRpcMaxConcurrentClientNum(
conf.getRpcMaxConcurrentClientNum())
.setRpcThriftCompressionEnabled(
@@ -72,6 +75,7 @@ public class DataRegionConsensusImpl {
conf.getSelectorNumOfClientManager())
.setThriftServerAwaitTimeForStopService(
conf.getThriftServerAwaitTimeForStopService())
+
.setThriftMaxFrameSize(conf.getThriftMaxFrameSize())
.build())
.build())
.setRatisConfig(