This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 77e9403058b [To dev/1.3] Clean up dead RPC thread config and use
node-specific selectorNum (#17618)
77e9403058b is described below
commit 77e9403058bf7e2197c26cd40ca1469310020e83
Author: Jackie Tien <[email protected]>
AuthorDate: Fri May 8 14:43:28 2026 +0800
[To dev/1.3] Clean up dead RPC thread config and use node-specific
selectorNum (#17618)
---
.../org/apache/iotdb/it/env/cluster/EnvUtils.java | 2 +-
.../iotdb/it/env/cluster/config/MppBaseConfig.java | 2 +
.../async/AsyncAINodeHeartbeatClientPool.java | 4 +-
.../async/AsyncConfigNodeHeartbeatClientPool.java | 4 +-
.../async/AsyncDataNodeHeartbeatClientPool.java | 4 +-
.../CnToCnInternalServiceAsyncRequestManager.java | 8 ++-
.../CnToDnInternalServiceAsyncRequestManager.java | 5 ++
.../iotdb/confignode/conf/ConfigNodeConfig.java | 28 ++++++++++
.../confignode/conf/ConfigNodeDescriptor.java | 18 +++++++
.../iotdb/consensus/config/IoTConsensusConfig.java | 22 --------
.../consensus/config/PipeConsensusConfig.java | 28 ----------
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 43 ++++------------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 49 ++++++------------
.../db/consensus/DataRegionConsensusImpl.java | 4 --
.../protocol/client/DataNodeClientPoolFactory.java | 28 ----------
.../DnToCnInternalServiceAsyncRequestManager.java | 5 ++
...DataNodeExternalServiceAsyncRequestManager.java | 10 +++-
.../dn/DataNodeMPPServiceAsyncRequestManager.java | 10 ++--
.../DnToDnInternalServiceAsyncRequestManager.java | 5 ++
.../iotdb/db/queryengine/plan/Coordinator.java | 13 +----
.../config/executor/ClusterConfigTaskExecutor.java | 24 +++------
.../queryengine/plan/planner/TreeModelPlanner.java | 9 ----
.../plan/scheduler/ClusterScheduler.java | 5 --
.../scheduler/FragmentInstanceDispatcherImpl.java | 7 ---
.../conf/iotdb-system.properties.template | 32 +++++++-----
.../iotdb/commons/client/ClientPoolFactory.java | 60 +++++++++++++++++++---
.../client/property/ClientPoolProperty.java | 16 +++++-
.../client/property/ThriftClientProperty.java | 5 +-
.../client/request/AsyncRequestManager.java | 6 +--
...nfigNodeInternalServiceAsyncRequestManager.java | 10 +++-
.../DataNodeInternalServiceRequestManager.java | 10 +++-
.../iotdb/commons/concurrent/ThreadName.java | 2 -
.../apache/iotdb/commons/conf/CommonConfig.java | 15 +++++-
.../iotdb/commons/conf/CommonDescriptor.java | 36 +++++++++++--
.../iotdb/commons/client/ClientManagerTest.java | 19 ++++++-
35 files changed, 304 insertions(+), 244 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
index 9663fa371e9..136399a3778 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
@@ -117,7 +117,7 @@ public class EnvUtils {
}
private static String getUnixSearchPortCmd(final List<Integer> ports) {
- return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "
+ return "lsof -iTCP -sTCP:LISTEN,TIME_WAIT -P -n | awk '{print $9}' | grep
-E "
+ ports.stream().map(String::valueOf).collect(Collectors.joining("|"))
+ "\"";
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
index 473d2f1b64c..975ae1a5682 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
@@ -40,6 +40,8 @@ public abstract class MppBaseConfig {
/** Create an empty MppPersistentConfig. */
protected MppBaseConfig() {
this.properties = new Properties();
+ this.properties.setProperty("cn_selector_thread_nums_of_client_manager",
"1");
+ this.properties.setProperty("dn_selector_thread_nums_of_client_manager",
"1");
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index e09ccc79bec..8ec455142b8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.ainode.AsyncAINodeServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
public class AsyncAINodeHeartbeatClientPool {
@@ -34,7 +35,8 @@ public class AsyncAINodeHeartbeatClientPool {
clientManager =
new IClientManager.Factory<TEndPoint, AsyncAINodeServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}
public void getAINodeHeartBeat(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 7b6bca5d0d9..a6dffbe0eef 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
public class AsyncConfigNodeHeartbeatClientPool {
@@ -34,7 +35,8 @@ public class AsyncConfigNodeHeartbeatClientPool {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncConfigNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index 8d67d150efb..18a8120a9e4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
/** Asynchronously send RPC requests to DataNodes. See queryengine.thrift for
more details. */
@@ -35,7 +36,8 @@ public class AsyncDataNodeHeartbeatClientPool {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
index 19eaf9d9a40..00267e5a8cc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskToConfigNodeRPCHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,10 @@ public class CnToCnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToCnInternalServiceAsyncRequestManager.class);
+ public CnToCnInternalServiceAsyncRequestManager(int
selectorNumOfAsyncClientManager) {
+ super(selectorNumOfAsyncClientManager);
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
@@ -71,7 +76,8 @@ public class CnToCnInternalServiceAsyncRequestManager
private static class ClientPoolHolder {
private static final CnToCnInternalServiceAsyncRequestManager INSTANCE =
- new CnToCnInternalServiceAsyncRequestManager();
+ new CnToCnInternalServiceAsyncRequestManager(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
private ClientPoolHolder() {
// Empty constructor
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index b9aac92a165..e4dcaad2bfb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHa
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
@@ -105,6 +106,10 @@ public class CnToDnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class);
+ private CnToDnInternalServiceAsyncRequestManager() {
+
super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
+ }
+
@SuppressWarnings("unchecked")
@Override
protected void initActionMapBuilder() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 03d12ee46a6..77b353ea59d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -136,6 +136,17 @@ public class ConfigNodeConfig {
*/
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxIdleClientNumForEachNode =
DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
+
+ /**
+ * ClientManager will have so many selector threads (TAsyncClientManager) to
distribute to its
+ * clients.
+ */
+ private int selectorNumOfClientManager =
+ Runtime.getRuntime().availableProcessors() / 4 > 0
+ ? Runtime.getRuntime().availableProcessors() / 4
+ : 1;
+
/** System directory, including version file for each database and metadata.
*/
private String systemDir =
IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator +
IoTDBConstant.SYSTEM_FOLDER_NAME;
@@ -448,6 +459,23 @@ public class ConfigNodeConfig {
return this;
}
+ public int getMaxIdleClientNumForEachNode() {
+ return maxIdleClientNumForEachNode;
+ }
+
+ public ConfigNodeConfig setMaxIdleClientNumForEachNode(int
maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ return this;
+ }
+
+ public int getSelectorNumOfClientManager() {
+ return selectorNumOfClientManager;
+ }
+
+ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
+ this.selectorNumOfClientManager = selectorNumOfClientManager;
+ }
+
public String getConsensusDir() {
return consensusDir;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 614bae210df..83cf1b612d0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -299,6 +299,24 @@ public class ConfigNodeDescriptor {
String.valueOf(conf.getMaxClientNumForEachNode()))
.trim()));
+ int cnMaxIdleClientNumForEachNode =
+ Integer.parseInt(
+ properties.getProperty(
+ "cn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getMaxIdleClientNumForEachNode())));
+ if (cnMaxIdleClientNumForEachNode >= 0) {
+ conf.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode);
+ }
+
+ int cnSelectorNumOfClientManager =
+ Integer.parseInt(
+ properties.getProperty(
+ "cn_selector_thread_nums_of_client_manager",
+ String.valueOf(conf.getSelectorNumOfClientManager())));
+ if (cnSelectorNumOfClientManager > 0) {
+ conf.setSelectorNumOfClientManager(cnSelectorNumOfClientManager);
+ }
+
conf.setSystemDir(properties.getProperty("cn_system_dir",
conf.getSystemDir()).trim());
conf.setConsensusDir(properties.getProperty("cn_consensus_dir",
conf.getConsensusDir()).trim());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 6c89e19291b..25b084c5329 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -70,8 +70,6 @@ public class IoTConsensusConfig {
public static class RPC {
- private final int rpcSelectorThreadNum;
- private final int rpcMinConcurrentClientNum;
private final int rpcMaxConcurrentClientNum;
private final int thriftServerAwaitTimeForStopService;
private final boolean isRpcThriftCompressionEnabled;
@@ -83,8 +81,6 @@ public class IoTConsensusConfig {
private final int maxClientNumForEachNode;
private RPC(
- int rpcSelectorThreadNum,
- int rpcMinConcurrentClientNum,
int rpcMaxConcurrentClientNum,
int thriftServerAwaitTimeForStopService,
boolean isRpcThriftCompressionEnabled,
@@ -93,8 +89,6 @@ public class IoTConsensusConfig {
boolean printLogWhenThriftClientEncounterException,
int thriftMaxFrameSize,
int maxClientNumForEachNode) {
- this.rpcSelectorThreadNum = rpcSelectorThreadNum;
- this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
this.thriftServerAwaitTimeForStopService =
thriftServerAwaitTimeForStopService;
this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
@@ -105,14 +99,6 @@ public class IoTConsensusConfig {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
- public int getRpcSelectorThreadNum() {
- return rpcSelectorThreadNum;
- }
-
- public int getRpcMinConcurrentClientNum() {
- return rpcMinConcurrentClientNum;
- }
-
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
@@ -151,7 +137,6 @@ public class IoTConsensusConfig {
public static class Builder {
- private int rpcSelectorThreadNum = 1;
private int rpcMinConcurrentClientNum =
Runtime.getRuntime().availableProcessors();
private int rpcMaxConcurrentClientNum = 65535;
private int thriftServerAwaitTimeForStopService = 60;
@@ -163,11 +148,6 @@ public class IoTConsensusConfig {
private int thriftMaxFrameSize = 536870912;
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
- public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
- this.rpcSelectorThreadNum = rpcSelectorThreadNum;
- return this;
- }
-
public RPC.Builder setRpcMinConcurrentClientNum(int
rpcMinConcurrentClientNum) {
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
return this;
@@ -218,8 +198,6 @@ public class IoTConsensusConfig {
public RPC build() {
return new RPC(
- rpcSelectorThreadNum,
- rpcMinConcurrentClientNum,
rpcMaxConcurrentClientNum,
thriftServerAwaitTimeForStopService,
isRpcThriftCompressionEnabled,
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
index 2cb149b601b..81c6e53e73a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java
@@ -79,8 +79,6 @@ public class PipeConsensusConfig {
}
public static class RPC {
- private final int rpcSelectorThreadNum;
- private final int rpcMinConcurrentClientNum;
private final int rpcMaxConcurrentClientNum;
private final int thriftServerAwaitTimeForStopService;
private final boolean isRpcThriftCompressionEnabled;
@@ -88,15 +86,11 @@ public class PipeConsensusConfig {
private final int thriftMaxFrameSize;
public RPC(
- int rpcSelectorThreadNum,
- int rpcMinConcurrentClientNum,
int rpcMaxConcurrentClientNum,
int thriftServerAwaitTimeForStopService,
boolean isRpcThriftCompressionEnabled,
int connectionTimeoutInMs,
int thriftMaxFrameSize) {
- this.rpcSelectorThreadNum = rpcSelectorThreadNum;
- this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
this.thriftServerAwaitTimeForStopService =
thriftServerAwaitTimeForStopService;
this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
@@ -104,14 +98,6 @@ public class PipeConsensusConfig {
this.thriftMaxFrameSize = thriftMaxFrameSize;
}
- public int getRpcSelectorThreadNum() {
- return rpcSelectorThreadNum;
- }
-
- public int getRpcMinConcurrentClientNum() {
- return rpcMinConcurrentClientNum;
- }
-
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
@@ -137,24 +123,12 @@ public class PipeConsensusConfig {
}
public static class Builder {
- private int rpcSelectorThreadNum = 1;
- private int rpcMinConcurrentClientNum =
Runtime.getRuntime().availableProcessors();
private int rpcMaxConcurrentClientNum = 65535;
private int thriftServerAwaitTimeForStopService = 60;
private boolean isRpcThriftCompressionEnabled = false;
private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(60);
private int thriftMaxFrameSize = 536870912;
- public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
- this.rpcSelectorThreadNum = rpcSelectorThreadNum;
- return this;
- }
-
- public RPC.Builder setRpcMinConcurrentClientNum(int
rpcMinConcurrentClientNum) {
- this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
- return this;
- }
-
public RPC.Builder setRpcMaxConcurrentClientNum(int
rpcMaxConcurrentClientNum) {
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
return this;
@@ -183,8 +157,6 @@ public class PipeConsensusConfig {
public RPC build() {
return new RPC(
- rpcSelectorThreadNum,
- rpcMinConcurrentClientNum,
rpcMaxConcurrentClientNum,
thriftServerAwaitTimeForStopService,
isRpcThriftCompressionEnabled,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5e89c4ec083..d3f13d14392 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -139,12 +139,6 @@ public class IoTDBConfig {
/** ssl key Store password. */
private String keyStorePwd = "";
- /** Rpc Selector thread num */
- private int rpcSelectorThreadCount = 1;
-
- /** Min concurrent client number */
- private int rpcMinConcurrentClientNum =
Runtime.getRuntime().availableProcessors();
-
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 1000;
@@ -999,6 +993,8 @@ public class IoTDBConfig {
*/
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxIdleClientNumForEachNode =
DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
+
/**
* Cache size of partition cache in {@link
* org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher}
@@ -1035,9 +1031,6 @@ public class IoTDBConfig {
/** ThreadPool size for read operation in coordinator */
private int coordinatorReadExecutorSize = 20;
- /** ThreadPool size for write operation in coordinator */
- private int coordinatorWriteExecutorSize = 50;
-
private int[] schemaMemoryProportion = new int[] {5, 4, 1};
/** Memory allocated for schemaRegion */
@@ -1872,22 +1865,6 @@ public class IoTDBConfig {
this.unSeqTsFileSize = unSeqTsFileSize;
}
- public int getRpcSelectorThreadCount() {
- return rpcSelectorThreadCount;
- }
-
- public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) {
- this.rpcSelectorThreadCount = rpcSelectorThreadCount;
- }
-
- public int getRpcMinConcurrentClientNum() {
- return rpcMinConcurrentClientNum;
- }
-
- public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) {
- this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
- }
-
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
@@ -3338,6 +3315,14 @@ public class IoTDBConfig {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
+ public int getMaxIdleClientNumForEachNode() {
+ return maxIdleClientNumForEachNode;
+ }
+
+ public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ }
+
public int getSelectorNumOfClientManager() {
return selectorNumOfClientManager;
}
@@ -3484,14 +3469,6 @@ public class IoTDBConfig {
this.coordinatorReadExecutorSize = coordinatorReadExecutorSize;
}
- public int getCoordinatorWriteExecutorSize() {
- return coordinatorWriteExecutorSize;
- }
-
- public void setCoordinatorWriteExecutorSize(int
coordinatorWriteExecutorSize) {
- this.coordinatorWriteExecutorSize = coordinatorWriteExecutorSize;
- }
-
public TEndPoint getAddressAndPort() {
return new TEndPoint(rpcAddress, rpcPort);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e965611016d..b01fca1a88f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -287,13 +287,23 @@ public class IoTDBDescriptor {
String.valueOf(conf.getMaxClientNumForEachNode()))
.trim()));
- conf.setSelectorNumOfClientManager(
+ int dnMaxIdleClientNumForEachNode =
Integer.parseInt(
- properties
- .getProperty(
- "dn_selector_thread_count_of_client_manager",
- String.valueOf(conf.getSelectorNumOfClientManager()))
- .trim()));
+ properties.getProperty(
+ "dn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getMaxIdleClientNumForEachNode())));
+ if (dnMaxIdleClientNumForEachNode >= 0) {
+ conf.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode);
+ }
+
+ int dnSelectorNumOfClientManager =
+ Integer.parseInt(
+ properties.getProperty(
+ "dn_selector_thread_nums_of_client_manager",
+ String.valueOf(conf.getSelectorNumOfClientManager())));
+ if (dnSelectorNumOfClientManager > 0) {
+ conf.setSelectorNumOfClientManager(dnSelectorNumOfClientManager);
+ }
conf.setRpcPort(
Integer.parseInt(
@@ -775,28 +785,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"0.13_data_insert_adapt",
String.valueOf(conf.isEnable13DataInsertAdapt()))));
- int rpcSelectorThreadNum =
- Integer.parseInt(
- properties.getProperty(
- "dn_rpc_selector_thread_count",
- Integer.toString(conf.getRpcSelectorThreadCount()).trim()));
- if (rpcSelectorThreadNum <= 0) {
- rpcSelectorThreadNum = 1;
- }
-
- conf.setRpcSelectorThreadCount(rpcSelectorThreadNum);
-
- int minConcurrentClientNum =
- Integer.parseInt(
- properties.getProperty(
- "dn_rpc_min_concurrent_client_num",
- Integer.toString(conf.getRpcMinConcurrentClientNum()).trim()));
- if (minConcurrentClientNum <= 0) {
- minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
- }
-
- conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
-
int maxConcurrentClientNum =
Integer.parseInt(
properties.getProperty(
@@ -1024,11 +1012,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"coordinator_read_executor_size",
Integer.toString(conf.getCoordinatorReadExecutorSize()))));
- conf.setCoordinatorWriteExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_write_executor_size",
- Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
// Commons
commonDescriptor.loadCommonProps(properties);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index 7dbc720a4a8..fa8e1efc6d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -123,8 +123,6 @@ public class DataRegionConsensusImpl {
.setRpc(
RPC.newBuilder()
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
-
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
-
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
.setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
@@ -151,8 +149,6 @@ public class DataRegionConsensusImpl {
PipeConsensusConfig.RPC
.newBuilder()
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
-
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
-
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
.setThriftServerAwaitTimeForStopService(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
index b5f5df43012..102b1d2cbad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
@@ -61,32 +61,4 @@ public class DataNodeClientPoolFactory {
return clientPool;
}
}
-
- public static class ClusterDeletionConfigNodeClientPoolFactory
- implements IClientPoolFactory<ConfigRegionId, ConfigNodeClient> {
-
- @Override
- public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient>
createClientPool(
- ClientManager<ConfigRegionId, ConfigNodeClient> manager) {
- GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool =
- new GenericKeyedObjectPool<>(
- new ConfigNodeClient.Factory(
- manager,
- new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS()
* 10)
-
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(
- conf.getSelectorNumOfClientManager() / 10 > 0
- ? conf.getSelectorNumOfClientManager() / 10
- : 1)
- .build()),
- new ClientPoolProperty.Builder<ConfigNodeClient>()
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build()
- .getConfig());
- ClientManagerMetrics.getInstance()
- .registerClientManager(this.getClass().getSimpleName(), clientPool);
- return clientPool;
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
index b6a218a732e..9ac6b708411 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager;
import org.apache.iotdb.commons.client.request.TestConnectionUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,10 @@ public class DnToCnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DnToCnInternalServiceAsyncRequestManager.class);
+ public DnToCnInternalServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
index dd56e1366c0..29c3f767907 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeExternalServiceClient;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestManager;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,12 +39,17 @@ public class DataNodeExternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeExternalServiceAsyncRequestManager.class);
+ private DataNodeExternalServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeExternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
index ab08d83f264..e4d190e1571 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeService
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestManager;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,14 +38,17 @@ public class DataNodeMPPServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeMPPServiceAsyncRequestManager.class);
- public DataNodeMPPServiceAsyncRequestManager() {}
+ public DataNodeMPPServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
index 88766458b64..c2ca62cf15d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +33,10 @@ public class DnToDnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DnToDnInternalServiceAsyncRequestManager.class);
+ private DnToDnInternalServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 8afdfce7d5c..2f173861f22 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -92,10 +92,10 @@ public class Coordinator {
ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory(
+ CONFIG.getSelectorNumOfClientManager()));
private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final ExecutorService dispatchExecutor;
@@ -109,7 +109,6 @@ public class Coordinator {
private Coordinator() {
this.queryExecutionMap = new ConcurrentHashMap<>();
this.executor = getQueryExecutor();
- this.writeOperationExecutor = getWriteExecutor();
this.scheduledExecutor = getScheduledExecutor();
int dispatchThreadNum = Math.max(20,
Runtime.getRuntime().availableProcessors() * 2);
this.dispatchExecutor =
@@ -220,8 +219,6 @@ public class Coordinator {
TreeModelPlanner treeModelPlanner =
new TreeModelPlanner(
statement,
- executor,
- writeOperationExecutor,
scheduledExecutor,
partitionFetcher,
schemaFetcher,
@@ -248,12 +245,6 @@ public class Coordinator {
coordinatorReadExecutorSize,
ThreadName.MPP_COORDINATOR_EXECUTOR_POOL.getName());
}
- private ExecutorService getWriteExecutor() {
- int coordinatorWriteExecutorSize =
CONFIG.getCoordinatorWriteExecutorSize();
- return IoTDBThreadPoolFactory.newFixedThreadPool(
- coordinatorWriteExecutorSize,
ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName());
- }
-
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
COORDINATOR_SCHEDULED_EXECUTOR_SIZE,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 023f7e33793..d582832864a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -143,7 +143,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
-import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -306,13 +305,6 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
- /** FIXME Consolidate this clientManager with the upper one. */
- private static final IClientManager<ConfigRegionId, ConfigNodeClient>
- CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigRegionId, ConfigNodeClient>()
- .createClientManager(
- new
DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory());
-
private static final class ClusterConfigTaskExecutorHolder {
private static final ClusterConfigTaskExecutor INSTANCE = new
ClusterConfigTaskExecutor();
@@ -1598,7 +1590,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
req.setPathPatternTree(
serializePatternListToByteBuffer(deactivateTemplateStatement.getPathPatternList()));
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -1699,7 +1691,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
alterSchemaTemplateStatement.getOperationType(),
alterSchemaTemplateStatement.getTemplateAlterInfo()));
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -1754,7 +1746,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
req.setTemplateName(unsetSchemaTemplateStatement.getTemplateName());
req.setPath(unsetSchemaTemplateStatement.getPath().getFullPath());
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2406,7 +2398,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
queryId,
serializePatternListToByteBuffer(deleteTimeSeriesStatement.getPathPatternList()));
try (ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2452,7 +2444,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
queryId,
serializePatternListToByteBuffer(deleteLogicalViewStatement.getPathPatternList()));
try (ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2541,7 +2533,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new TDeleteLogicalViewReq(
queryId,
serializePatternListToByteBuffer(Collections.singletonList(oldName)));
try (ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2613,7 +2605,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new TAlterLogicalViewReq(
context.getQueryId().getId(),
ByteBuffer.wrap(stream.toByteArray()));
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2675,7 +2667,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.setIsGeneratedByPipe(shouldMarkAsPipeRequest);
TSStatus tsStatus;
try (ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
do {
try {
tsStatus = client.alterLogicalView(req);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 1da701a8e35..2812d81cf27 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -48,15 +48,12 @@ import org.apache.iotdb.rpc.TSStatusCode;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
public class TreeModelPlanner implements IPlanner {
private final Statement statement;
- private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final IPartitionFetcher partitionFetcher;
@@ -71,8 +68,6 @@ public class TreeModelPlanner implements IPlanner {
public TreeModelPlanner(
Statement statement,
- ExecutorService executor,
- ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
@@ -80,8 +75,6 @@ public class TreeModelPlanner implements IPlanner {
IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
asyncInternalServiceClientManager) {
this.statement = statement;
- this.executor = executor;
- this.writeOperationExecutor = writeOperationExecutor;
this.scheduledExecutor = scheduledExecutor;
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
@@ -134,8 +127,6 @@ public class TreeModelPlanner implements IPlanner {
stateMachine,
distributedPlan,
context.getQueryType(),
- executor,
- writeOperationExecutor,
scheduledExecutor,
syncInternalServiceClientManager,
asyncInternalServiceClientManager);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index 33f1bd3a194..283e480ae38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -74,8 +73,6 @@ public class ClusterScheduler implements IScheduler {
QueryStateMachine stateMachine,
DistributedQueryPlan distributedQueryPlan,
QueryType queryType,
- ExecutorService executor,
- ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -88,8 +85,6 @@ public class ClusterScheduler implements IScheduler {
new FragmentInstanceDispatcherImpl(
queryType,
queryContext,
- executor,
- writeOperationExecutor,
syncInternalServiceClientManager,
asyncInternalServiceClientManager);
if (queryType == QueryType.READ) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 41d325a5761..3312444f882 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -65,7 +65,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -81,8 +80,6 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
- private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
private final QueryType type;
private final MPPQueryContext queryContext;
private final String localhostIpAddr;
@@ -104,15 +101,11 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
public FragmentInstanceDispatcherImpl(
QueryType type,
MPPQueryContext queryContext,
- ExecutorService executor,
- ExecutorService writeOperationExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
asyncInternalServiceClientManager) {
this.type = type;
this.queryContext = queryContext;
- this.executor = executor;
- this.writeOperationExecutor = writeOperationExecutor;
this.syncInternalServiceClientManager = syncInternalServiceClientManager;
this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
this.localhostIpAddr =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index c7862e4886e..5447bcf5fd4 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -465,9 +465,10 @@ cn_rpc_max_concurrent_client_num=3000
cn_connection_timeout_ms=60000
# selector thread (TAsyncClientManager) nums for async thread in a
clientManager
+# When <= 0, use max(1, CPU core number / 4).
# effectiveMode: restart
# Datatype: int
-cn_selector_thread_nums_of_client_manager=1
+cn_selector_thread_nums_of_client_manager=0
# The maximum number of clients that can be allocated for a node in a
clientManager.
# when the number of the client to a single node exceeds this number, the
thread for applying for a client will be blocked
@@ -476,6 +477,14 @@ cn_selector_thread_nums_of_client_manager=1
# Datatype: int
cn_max_client_count_for_each_node_in_client_manager=1000
+# The maximum number of idle clients that can be retained for a node in a
clientManager.
+# When the number of idle clients to a single node exceeds this number, excess
idle clients will be evicted.
+# Idle clients are determined by a time threshold (default 1 minute of
inactivity).
+# 0 means no idle clients will be retained, connections are destroyed
immediately upon return.
+# effectiveMode: restart
+# Datatype: int
+# cn_max_idle_client_count_for_each_node_in_client_manager=1000
+
# The maximum session idle time. unit: ms
# Idle sessions are the ones that performs neither query or non-query
operations for a period of time
# Set to 0 to disable session timeout
@@ -494,16 +503,6 @@ dn_rpc_thrift_compression_enable=false
# this feature is under development, set this as false before it is done.
dn_rpc_advanced_compression_enable=false
-# the number of rpc selector
-# effectiveMode: restart
-# Datatype: int
-dn_rpc_selector_thread_count=1
-
-# The min number of concurrent clients that can be connected to the dataNode.
-# effectiveMode: restart
-# Datatype: int
-dn_rpc_min_concurrent_client_num=1
-
# The maximum number of concurrent clients that can be connected to the
dataNode.
# effectiveMode: restart
# Datatype: int
@@ -525,9 +524,10 @@ dn_thrift_init_buffer_size=1024
dn_connection_timeout_ms=60000
# selector thread (TAsyncClientManager) nums for async thread in a
clientManager
+# When <= 0, use max(1, CPU core number / 4).
# effectiveMode: restart
# Datatype: int
-dn_selector_thread_count_of_client_manager=1
+dn_selector_thread_nums_of_client_manager=0
# The maximum number of clients that can be allocated for a node in a
clientManager.
# When the number of the client to a single node exceeds this number, the
thread for applying for a client will be blocked
@@ -536,6 +536,14 @@ dn_selector_thread_count_of_client_manager=1
# Datatype: int
dn_max_client_count_for_each_node_in_client_manager=1000
+# The maximum number of idle clients that can be retained for a node in a
clientManager.
+# When the number of idle clients to a single node exceeds this number, excess
idle clients will be evicted.
+# Idle clients are determined by a time threshold (default 1 minute of
inactivity).
+# 0 means no idle clients will be retained, connections are destroyed
immediately upon return.
+# effectiveMode: restart
+# Datatype: int
+# dn_max_idle_client_count_for_each_node_in_client_manager=1000
+
####################
### REST Service Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 3ff47a2c5e1..a70a6aa911d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -72,6 +72,12 @@ public class ClientPoolFactory {
public static class AsyncConfigNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncConfigNodeInternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncConfigNodeInternalServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncConfigNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>
manager) {
@@ -82,7 +88,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.build(),
ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncConfigNodeInternalServiceClient>()
@@ -120,6 +126,12 @@ public class ClientPoolFactory {
public static class AsyncDataNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeInternalServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
@@ -130,7 +142,8 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
+ .setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
@@ -145,6 +158,12 @@ public class ClientPoolFactory {
public static class AsyncDataNodeExternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeExternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeExternalServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeExternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeExternalServiceClient> manager) {
@@ -155,7 +174,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeExternalServiceClient>()
@@ -170,6 +189,12 @@ public class ClientPoolFactory {
public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncConfigNodeInternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncConfigNodeHeartbeatServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncConfigNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>
manager) {
@@ -181,7 +206,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
@@ -196,6 +221,13 @@ public class ClientPoolFactory {
public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
+
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeHeartbeatServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
@@ -206,7 +238,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
@@ -246,6 +278,13 @@ public class ClientPoolFactory {
public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeMPPDataExchangeServiceClientPoolFactory(
+ int selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient>
createClientPool(
@@ -257,7 +296,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.build(),
ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
@@ -323,6 +362,13 @@ public class ClientPoolFactory {
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
+
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncAINodeHeartbeatServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient>
createClientPool(
ClientManager<TEndPoint, AsyncAINodeServiceClient> manager) {
@@ -333,7 +379,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index 5bb7c22ee49..1f818afe543 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.client.property;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import java.time.Duration;
@@ -49,7 +51,11 @@ public class ClientPoolProperty<V> {
* the maximum number of clients that can be allocated for a node. When
some clients are idle
* for more than {@code maxIdleTimeForClient}, they will be cleaned up.
*/
- private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxClientNumForEachNode =
+
CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode();
+
+ private int maxIdleClientNumForEachNode =
+
CommonDescriptor.getInstance().getConfig().getMaxIdleClientNumForEachNode();
/**
* the minimum amount of time a client may sit idle in the pool before it
is eligible for
@@ -74,6 +80,11 @@ public class ClientPoolProperty<V> {
return this;
}
+ public Builder<V> setMaxIdleClientNumForEachNode(int
maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ return this;
+ }
+
public Builder<V> setMinIdleTimeForClient(long minIdleTimeForClient) {
this.minIdleTimeForClient = minIdleTimeForClient;
return this;
@@ -87,7 +98,7 @@ public class ClientPoolProperty<V> {
public ClientPoolProperty<V> build() {
GenericKeyedObjectPoolConfig<V> poolConfig = new
GenericKeyedObjectPoolConfig<>();
poolConfig.setMaxTotalPerKey(maxClientNumForEachNode);
- poolConfig.setMaxIdlePerKey(maxClientNumForEachNode);
+ poolConfig.setMaxIdlePerKey(maxIdleClientNumForEachNode);
poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRuns));
poolConfig.setMinEvictableIdleTime(Duration.ofMillis(minIdleTimeForClient));
poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMs));
@@ -105,5 +116,6 @@ public class ClientPoolProperty<V> {
public static final long MIN_IDLE_TIME_FOR_CLIENT_MS =
TimeUnit.MINUTES.toMillis(1);
public static final long TIME_BETWEEN_EVICTION_RUNS_MS =
TimeUnit.MINUTES.toMillis(1);
public static final int MAX_CLIENT_NUM_FOR_EACH_NODE = 1000;
+ public static final int MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE = 1000;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
index f8fe16166a8..f157ee7df93 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
@@ -117,7 +117,10 @@ public class ThriftClientProperty {
public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
public static final int CONNECTION_TIMEOUT_MS = (int)
TimeUnit.SECONDS.toMillis(20);
public static final int CONNECTION_NEVER_TIMEOUT_MS = 0;
- public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
+ public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER =
+ Runtime.getRuntime().availableProcessors() / 4 > 0
+ ? Runtime.getRuntime().availableProcessors() / 4
+ : 1;
public static final boolean PRINT_LOG_WHEN_ENCOUNTER_EXCEPTION = true;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 3cca39bb635..0290d33d3a4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -53,15 +53,15 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
private static final int MAX_RETRY_NUM = 6;
- protected AsyncRequestManager() {
- initClientManager();
+ protected AsyncRequestManager(int selectorNumOfAsyncClientManager) {
+ initClientManager(selectorNumOfAsyncClientManager);
actionMapBuilder = ImmutableMap.builder();
initActionMapBuilder();
this.actionMap = this.actionMapBuilder.build();
checkActionMapCompleteness();
}
- protected abstract void initClientManager();
+ protected abstract void initClientManager(int
selectorNumOfAsyncClientManager);
protected abstract void initActionMapBuilder();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
index 791a1e5df0e..3b50c29fba6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
@@ -28,12 +28,18 @@ import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClien
public abstract class ConfigNodeInternalServiceAsyncRequestManager<RequestType>
extends AsyncRequestManager<
RequestType, TConfigNodeLocation,
AsyncConfigNodeInternalServiceClient> {
+
+ protected ConfigNodeInternalServiceAsyncRequestManager(int
selectorNumOfAsyncClientManager) {
+ super(selectorNumOfAsyncClientManager);
+ }
+
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncConfigNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
index fcb1b01857d..722d4f241eb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
@@ -28,12 +28,18 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
public abstract class DataNodeInternalServiceRequestManager<RequestType>
extends AsyncRequestManager<
RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> {
+
+ protected DataNodeInternalServiceRequestManager(int
selectorNumOfAsyncClientManager) {
+ super(selectorNumOfAsyncClientManager);
+ }
+
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 696b6b8ce07..4f5ad140e99 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -44,7 +44,6 @@ public enum ThreadName {
MPP_COORDINATOR_EXECUTOR_POOL("MPP-Coordinator-Executor"),
DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"),
DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
- MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"),
ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
// -------------------------- Compaction --------------------------
COMPACTION_WORKER("Compaction-Worker"),
@@ -221,7 +220,6 @@ public enum ThreadName {
MPP_COORDINATOR_EXECUTOR_POOL,
DATANODE_INTERNAL_RPC_SERVICE,
DATANODE_INTERNAL_RPC_PROCESSOR,
- MPP_COORDINATOR_WRITE_EXECUTOR,
ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL));
private static final Set<ThreadName> compactionThreadNames =
new HashSet<>(Arrays.asList(COMPACTION_WORKER, COMPACTION_SUB_TASK,
COMPACTION_SCHEDULE));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 22e282d85a8..e1f52ba65ff 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -146,13 +146,18 @@ public class CommonConfig {
* ClientManager will have so many selector threads (TAsyncClientManager) to
distribute to its
* clients.
*/
- private int selectorNumOfClientManager = 1;
+ private int selectorNumOfClientManager =
+ Runtime.getRuntime().availableProcessors() / 4 > 0
+ ? Runtime.getRuntime().availableProcessors() / 4
+ : 1;
/** Whether to use thrift compression. */
private boolean isRpcThriftCompressionEnabled = false;
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxIdleClientNumForEachNode =
DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
+
/** What will the system do when unrecoverable error occurs. */
private HandleSystemErrorStrategy handleSystemErrorStrategy =
HandleSystemErrorStrategy.CHANGE_TO_READ_ONLY;
@@ -630,6 +635,14 @@ public class CommonConfig {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
+ public int getMaxIdleClientNumForEachNode() {
+ return maxIdleClientNumForEachNode;
+ }
+
+ public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ }
+
HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
return handleSystemErrorStrategy;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 961d4fbb8f2..ab7d949044d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -128,13 +128,16 @@ public class CommonDescriptor {
"cn_connection_timeout_ms",
String.valueOf(config.getCnConnectionTimeoutInMS()))
.trim()));
- config.setSelectorNumOfClientManager(
+ int cnSelectorNumOfClientManager =
Integer.parseInt(
properties
.getProperty(
"cn_selector_thread_nums_of_client_manager",
String.valueOf(config.getSelectorNumOfClientManager()))
- .trim()));
+ .trim());
+ if (cnSelectorNumOfClientManager > 0) {
+ config.setSelectorNumOfClientManager(cnSelectorNumOfClientManager);
+ }
config.setMaxClientNumForEachNode(
Integer.parseInt(
@@ -144,6 +147,17 @@ public class CommonDescriptor {
String.valueOf(config.getMaxClientNumForEachNode()))
.trim()));
+ int cnMaxIdleClientNumForEachNode =
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "cn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getMaxIdleClientNumForEachNode()))
+ .trim());
+ if (cnMaxIdleClientNumForEachNode >= 0) {
+ config.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode);
+ }
+
config.setDnConnectionTimeoutInMS(
Integer.parseInt(
properties
@@ -159,13 +173,16 @@ public class CommonDescriptor {
String.valueOf(config.isRpcThriftCompressionEnabled()))
.trim()));
- config.setSelectorNumOfClientManager(
+ int dnSelectorNumOfClientManager =
Integer.parseInt(
properties
.getProperty(
"dn_selector_thread_nums_of_client_manager",
String.valueOf(config.getSelectorNumOfClientManager()))
- .trim()));
+ .trim());
+ if (dnSelectorNumOfClientManager > 0) {
+ config.setSelectorNumOfClientManager(dnSelectorNumOfClientManager);
+ }
config.setMaxClientNumForEachNode(
Integer.parseInt(
@@ -175,6 +192,17 @@ public class CommonDescriptor {
String.valueOf(config.getMaxClientNumForEachNode()))
.trim()));
+ int dnMaxIdleClientNumForEachNode =
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "dn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getMaxIdleClientNumForEachNode()))
+ .trim());
+ if (dnMaxIdleClientNumForEachNode >= 0) {
+ config.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode);
+ }
+
config.setHandleSystemErrorStrategy(
HandleSystemErrorStrategy.valueOf(
properties
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 70c32ca4801..7296a0b3f78 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -214,6 +214,7 @@ public class ClientManagerTest {
manager, new
ThriftClientProperty.Builder().build()),
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setMaxClientNumForEachNode(maxClientForEachNode)
+
.setMaxIdleClientNumForEachNode(maxClientForEachNode)
.setMinIdleTimeForClient(minIdleDuration)
.setTimeBetweenEvictionRuns(evictionRunsDuration)
.build()
@@ -294,6 +295,7 @@ public class ClientManagerTest {
manager, new
ThriftClientProperty.Builder().build()),
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setMaxClientNumForEachNode(maxTotalClientForEachNode)
+
.setMaxIdleClientNumForEachNode(maxTotalClientForEachNode)
.setWaitClientTimeoutMs(waitClientTimeoutMs)
.build()
.getConfig());
@@ -369,6 +371,7 @@ public class ClientManagerTest {
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setWaitClientTimeoutMs(waitClientTimeoutMS)
.setMaxClientNumForEachNode(maxTotalClientForEachNode)
+
.setMaxIdleClientNumForEachNode(maxTotalClientForEachNode)
.build()
.getConfig());
}
@@ -617,7 +620,13 @@ public class ClientManagerTest {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(CONNECTION_TIMEOUT)
.build()),
- new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
+ .setMaxClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE)
+ .setMaxIdleClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE)
+ .build()
+ .getConfig());
}
}
@@ -632,7 +641,13 @@ public class ClientManagerTest {
manager,
new
ThriftClientProperty.Builder().setConnectionTimeoutMs(CONNECTION_TIMEOUT).build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
- new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
+ .setMaxClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE)
+ .setMaxIdleClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE)
+ .build()
+ .getConfig());
}
}
}