This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a6e717f624b Clean up dead RPC thread config and use node-specific
selectorNum (#17551)
a6e717f624b is described below
commit a6e717f624b685b67368dbc13208ecff65121019
Author: Jackie Tien <[email protected]>
AuthorDate: Thu May 7 17:51:23 2026 +0800
Clean up dead RPC thread config and use node-specific selectorNum (#17551)
---
.../org/apache/iotdb/it/env/cluster/EnvUtils.java | 17 +------
.../iotdb/it/env/cluster/config/MppBaseConfig.java | 2 +
.../async/AsyncAINodeHeartbeatClientPool.java | 4 +-
.../async/AsyncConfigNodeHeartbeatClientPool.java | 4 +-
.../async/AsyncDataNodeHeartbeatClientPool.java | 4 +-
.../CnToCnInternalServiceAsyncRequestManager.java | 8 ++-
.../CnToDnInternalServiceAsyncRequestManager.java | 5 ++
.../iotdb/confignode/conf/ConfigNodeConfig.java | 28 +++++++++++
.../confignode/conf/ConfigNodeDescriptor.java | 18 +++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 32 ++++--------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 36 ++++++--------
.../protocol/client/DataNodeClientPoolFactory.java | 28 -----------
.../DnToCnInternalServiceAsyncRequestManager.java | 5 ++
...DataNodeExternalServiceAsyncRequestManager.java | 10 +++-
.../client/dn/DataNodeIntraHeartbeatManager.java | 5 ++
.../dn/DataNodeMPPServiceAsyncRequestManager.java | 10 ++--
.../DnToDnInternalServiceAsyncRequestManager.java | 5 ++
.../iotdb/db/queryengine/plan/Coordinator.java | 12 +----
.../config/executor/ClusterConfigTaskExecutor.java | 46 +++++++----------
.../queryengine/plan/planner/TreeModelPlanner.java | 3 --
.../conf/iotdb-system.properties.template | 27 +++++++---
.../iotdb/commons/client/ClientPoolFactory.java | 58 +++++++++++++++++++---
.../client/property/ClientPoolProperty.java | 16 +++++-
.../client/property/ThriftClientProperty.java | 5 +-
.../client/request/AsyncRequestManager.java | 6 +--
...nfigNodeInternalServiceAsyncRequestManager.java | 10 +++-
.../DataNodeInternalServiceRequestManager.java | 10 +++-
.../DataNodeIntraHeartbeatRequestManager.java | 9 +++-
.../iotdb/commons/concurrent/ThreadName.java | 2 -
.../apache/iotdb/commons/conf/CommonConfig.java | 15 +++++-
.../iotdb/commons/conf/CommonDescriptor.java | 36 ++++++++++++--
.../iotdb/commons/client/ClientManagerTest.java | 19 ++++++-
32 files changed, 324 insertions(+), 171 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
index 51fd05d8eaf..53d7140a6f4 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
@@ -163,22 +163,7 @@ public class EnvUtils {
*/
public static Map<Integer, Long> listPortOccupationUnix(final List<Integer>
ports)
throws IOException {
- return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN -P -n", 10, 9,
1);
- }
-
- private static String getSearchAvailablePortCmd(final List<Integer> ports) {
- return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) :
getUnixSearchPortCmd(ports);
- }
-
- private static String getWindowsSearchPortCmd(final List<Integer> ports) {
- return "netstat -aon -p tcp | findStr "
- + ports.stream().map(v -> "/C:\"127.0.0.1:" + v +
"\"").collect(Collectors.joining(" "));
- }
-
- private static String getUnixSearchPortCmd(final List<Integer> ports) {
- return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "
- + ports.stream().map(String::valueOf).collect(Collectors.joining("|"))
- + "\"";
+ return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN,TIME_WAIT -P
-n", 10, 9, 1);
}
private static Pair<Integer, Integer> getClusterNodesNum(final int index) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
index 473d2f1b64c..975ae1a5682 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java
@@ -40,6 +40,8 @@ public abstract class MppBaseConfig {
/** Create an empty MppPersistentConfig. */
protected MppBaseConfig() {
this.properties = new Properties();
+ this.properties.setProperty("cn_selector_thread_nums_of_client_manager",
"1");
+ this.properties.setProperty("dn_selector_thread_nums_of_client_manager",
"1");
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
index 8d9081f4352..f3e1d1064ef 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncAINodeInternalServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
/** Asynchronously send RPC requests to AINodes. */
public class AsyncAINodeHeartbeatClientPool {
@@ -35,7 +36,8 @@ public class AsyncAINodeHeartbeatClientPool {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncAINodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 7b6bca5d0d9..a6dffbe0eef 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
public class AsyncConfigNodeHeartbeatClientPool {
@@ -34,7 +35,8 @@ public class AsyncConfigNodeHeartbeatClientPool {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncConfigNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index 324e3513027..f99a88ebc77 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import
org.apache.iotdb.confignode.client.async.handlers.audit.DataNodeWriteAuditLogHandler;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TAuditLogReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
@@ -37,7 +38,8 @@ public class AsyncDataNodeHeartbeatClientPool {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
index 19eaf9d9a40..00267e5a8cc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskToConfigNodeRPCHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,10 @@ public class CnToCnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToCnInternalServiceAsyncRequestManager.class);
+ public CnToCnInternalServiceAsyncRequestManager(int
selectorNumOfAsyncClientManager) {
+ super(selectorNumOfAsyncClientManager);
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
@@ -71,7 +76,8 @@ public class CnToCnInternalServiceAsyncRequestManager
private static class ClientPoolHolder {
private static final CnToCnInternalServiceAsyncRequestManager INSTANCE =
- new CnToCnInternalServiceAsyncRequestManager();
+ new CnToCnInternalServiceAsyncRequestManager(
+
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
private ClientPoolHolder() {
// Empty constructor
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index cd69f8b2c84..8c7b389bd3d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.TreeDeviceViewField
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterTimeSeriesReq;
@@ -120,6 +121,10 @@ public class CnToDnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class);
+ private CnToDnInternalServiceAsyncRequestManager() {
+
super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
+ }
+
@SuppressWarnings("unchecked")
@Override
protected void initActionMapBuilder() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index f305b06398d..64167ba1d2e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -145,6 +145,17 @@ public class ConfigNodeConfig {
*/
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxIdleClientNumForEachNode =
DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
+
+ /**
+ * ClientManager will have so many selector threads (TAsyncClientManager) to
distribute to its
+ * clients.
+ */
+ private int selectorNumOfClientManager =
+ Runtime.getRuntime().availableProcessors() / 4 > 0
+ ? Runtime.getRuntime().availableProcessors() / 4
+ : 1;
+
/** System directory, including version file for each database and metadata.
*/
private String systemDir =
IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator +
IoTDBConstant.SYSTEM_FOLDER_NAME;
@@ -460,6 +471,23 @@ public class ConfigNodeConfig {
return this;
}
+ public int getMaxIdleClientNumForEachNode() {
+ return maxIdleClientNumForEachNode;
+ }
+
+ public ConfigNodeConfig setMaxIdleClientNumForEachNode(int
maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ return this;
+ }
+
+ public int getSelectorNumOfClientManager() {
+ return selectorNumOfClientManager;
+ }
+
+ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
+ this.selectorNumOfClientManager = selectorNumOfClientManager;
+ }
+
public String getConsensusDir() {
return consensusDir;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 77790dae1a9..fa35565ff51 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -271,6 +271,24 @@ public class ConfigNodeDescriptor {
"cn_max_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxClientNumForEachNode()))));
+ int cnMaxIdleClientNumForEachNode =
+ Integer.parseInt(
+ properties.getProperty(
+ "cn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getMaxIdleClientNumForEachNode())));
+ if (cnMaxIdleClientNumForEachNode >= 0) {
+ conf.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode);
+ }
+
+ int cnSelectorNumOfClientManager =
+ Integer.parseInt(
+ properties.getProperty(
+ "cn_selector_thread_nums_of_client_manager",
+ String.valueOf(conf.getSelectorNumOfClientManager())));
+ if (cnSelectorNumOfClientManager > 0) {
+ conf.setSelectorNumOfClientManager(cnSelectorNumOfClientManager);
+ }
+
conf.setSystemDir(properties.getProperty("cn_system_dir",
conf.getSystemDir()));
conf.setConsensusDir(properties.getProperty("cn_consensus_dir",
conf.getConsensusDir()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c7fc72152e2..9b177cffcfa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -137,9 +137,6 @@ public class IoTDBConfig {
/** Port which the JDBC server listens to. */
private int rpcPort = 6667;
- /** Rpc Selector thread num */
- private int rpcSelectorThreadCount = 1;
-
/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 1000;
@@ -953,6 +950,8 @@ public class IoTDBConfig {
*/
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxIdleClientNumForEachNode =
DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
+
/**
* Cache size of partition cache in {@link
* org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher}
@@ -989,9 +988,6 @@ public class IoTDBConfig {
/** ThreadPool size for read operation in coordinator */
private int coordinatorReadExecutorSize = 20;
- /** ThreadPool size for write operation in coordinator */
- private int coordinatorWriteExecutorSize = 50;
-
/** Policy of DataNodeSchemaCache eviction */
private String dataNodeSchemaCacheEvictionPolicy = "FIFO";
@@ -1823,14 +1819,6 @@ public class IoTDBConfig {
this.unSeqTsFileSize = unSeqTsFileSize;
}
- public int getRpcSelectorThreadCount() {
- return rpcSelectorThreadCount;
- }
-
- public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) {
- this.rpcSelectorThreadCount = rpcSelectorThreadCount;
- }
-
public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
@@ -3202,6 +3190,14 @@ public class IoTDBConfig {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
+ public int getMaxIdleClientNumForEachNode() {
+ return maxIdleClientNumForEachNode;
+ }
+
+ public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ }
+
public int getSelectorNumOfClientManager() {
return selectorNumOfClientManager;
}
@@ -3349,14 +3345,6 @@ public class IoTDBConfig {
this.coordinatorReadExecutorSize = coordinatorReadExecutorSize;
}
- public int getCoordinatorWriteExecutorSize() {
- return coordinatorWriteExecutorSize;
- }
-
- public void setCoordinatorWriteExecutorSize(int
coordinatorWriteExecutorSize) {
- this.coordinatorWriteExecutorSize = coordinatorWriteExecutorSize;
- }
-
public TEndPoint getAddressAndPort() {
return new TEndPoint(rpcAddress, rpcPort);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 065583a383c..7193fd27c58 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -291,11 +291,23 @@ public class IoTDBDescriptor {
"dn_max_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxClientNumForEachNode()))));
- conf.setSelectorNumOfClientManager(
+ int dnMaxIdleClientNumForEachNode =
Integer.parseInt(
properties.getProperty(
- "dn_selector_thread_count_of_client_manager",
- String.valueOf(conf.getSelectorNumOfClientManager()))));
+ "dn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(conf.getMaxIdleClientNumForEachNode())));
+ if (dnMaxIdleClientNumForEachNode >= 0) {
+ conf.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode);
+ }
+
+ int dnSelectorNumOfClientManager =
+ Integer.parseInt(
+ properties.getProperty(
+ "dn_selector_thread_nums_of_client_manager",
+ String.valueOf(conf.getSelectorNumOfClientManager())));
+ if (dnSelectorNumOfClientManager > 0) {
+ conf.setSelectorNumOfClientManager(dnSelectorNumOfClientManager);
+ }
conf.setRpcPort(
Integer.parseInt(
@@ -724,18 +736,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"0.13_data_insert_adapt",
String.valueOf(conf.isEnable13DataInsertAdapt()))));
- int rpcSelectorThreadNum =
- Integer.parseInt(
- properties.getProperty(
- "dn_rpc_selector_thread_count",
- Integer.toString(conf.getRpcSelectorThreadCount())));
-
- if (rpcSelectorThreadNum <= 0) {
- rpcSelectorThreadNum = 1;
- }
-
- conf.setRpcSelectorThreadCount(rpcSelectorThreadNum);
-
int maxConcurrentClientNum =
Integer.parseInt(
properties.getProperty(
@@ -980,12 +980,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"coordinator_read_executor_size",
Integer.toString(conf.getCoordinatorReadExecutorSize()))));
- conf.setCoordinatorWriteExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_write_executor_size",
- Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
-
conf.setDataNodeTableSchemaCacheSize(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
index da0d84d8466..c3dd465f4b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
@@ -63,34 +63,6 @@ public class DataNodeClientPoolFactory {
}
}
- public static class ClusterDeletionConfigNodeClientPoolFactory
- implements IClientPoolFactory<ConfigRegionId, ConfigNodeClient> {
-
- @Override
- public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient>
createClientPool(
- ClientManager<ConfigRegionId, ConfigNodeClient> manager) {
- GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool =
- new GenericKeyedObjectPool<>(
- new ConfigNodeClient.Factory(
- manager,
- new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs(CONF.getConnectionTimeoutInMS()
* 10)
-
.setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(
- CONF.getSelectorNumOfClientManager() / 10 > 0
- ? CONF.getSelectorNumOfClientManager() / 10
- : 1)
- .build()),
- new ClientPoolProperty.Builder<ConfigNodeClient>()
-
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
- .build()
- .getConfig());
- ClientManagerMetrics.getInstance()
- .registerClientManager(this.getClass().getSimpleName(), clientPool);
- return clientPool;
- }
- }
-
public static class AINodeClientPoolFactory implements
IClientPoolFactory<Integer, AINodeClient> {
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
index b6a218a732e..9ac6b708411 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager;
import org.apache.iotdb.commons.client.request.TestConnectionUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +36,10 @@ public class DnToCnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DnToCnInternalServiceAsyncRequestManager.class);
+ public DnToCnInternalServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
index dd56e1366c0..29c3f767907 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeExternalServiceClient;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestManager;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,12 +39,17 @@ public class DataNodeExternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeExternalServiceAsyncRequestManager.class);
+ private DataNodeExternalServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeExternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
index d0ba1ba389f..b421f9b20da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
@@ -23,10 +23,15 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.DataNodeIntraHeartbeatRequestManager;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
public class DataNodeIntraHeartbeatManager
extends DataNodeIntraHeartbeatRequestManager<DnToDnRequestType> {
+ public DataNodeIntraHeartbeatManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
index ab08d83f264..e4d190e1571 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeService
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestManager;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,14 +38,17 @@ public class DataNodeMPPServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeMPPServiceAsyncRequestManager.class);
- public DataNodeMPPServiceAsyncRequestManager() {}
+ public DataNodeMPPServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
index b57a10bcb43..29dfbed203f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import
org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TAttributeUpdateReq;
import org.slf4j.Logger;
@@ -33,6 +34,10 @@ public class DnToDnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DnToDnInternalServiceAsyncRequestManager.class);
+ private DnToDnInternalServiceAsyncRequestManager() {
+
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
+ }
+
@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index d0c256a34c3..7cf6be0d49d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -216,10 +216,10 @@ public class Coordinator {
ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory(
+ CONFIG.getSelectorNumOfClientManager()));
private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final ExecutorService dispatchExecutor;
@@ -276,7 +276,6 @@ public class Coordinator {
this.queryExecutionMap = new ConcurrentHashMap<>();
this.typeManager = new InternalTypeManager();
this.executor = getQueryExecutor();
- this.writeOperationExecutor = getWriteExecutor();
this.scheduledExecutor = getScheduledExecutor();
int dispatchThreadNum = Math.max(20,
Runtime.getRuntime().availableProcessors() * 2);
this.dispatchExecutor =
@@ -410,7 +409,6 @@ public class Coordinator {
new TreeModelPlanner(
statement,
executor,
- writeOperationExecutor,
scheduledExecutor,
partitionFetcher,
schemaFetcher,
@@ -791,12 +789,6 @@ public class Coordinator {
coordinatorReadExecutorSize,
ThreadName.MPP_COORDINATOR_EXECUTOR_POOL.getName());
}
- private ExecutorService getWriteExecutor() {
- int coordinatorWriteExecutorSize =
CONFIG.getCoordinatorWriteExecutorSize();
- return IoTDBThreadPoolFactory.newFixedThreadPool(
- coordinatorWriteExecutorSize,
ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName());
- }
-
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
COORDINATOR_SCHEDULED_EXECUTOR_SIZE,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 3c156eb872b..ab823966de8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -188,7 +188,6 @@ import
org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
-import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.protocol.client.an.AINodeClient;
import org.apache.iotdb.db.protocol.client.an.AINodeClientManager;
import org.apache.iotdb.db.protocol.session.IClientSession;
@@ -406,13 +405,6 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
private static final IClientManager<Integer, AINodeClient>
AI_NODE_CLIENT_MANAGER =
AINodeClientManager.getInstance();
- /** FIXME Consolidate this clientManager with the upper one. */
- private static final IClientManager<ConfigRegionId, ConfigNodeClient>
- CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigRegionId, ConfigNodeClient>()
- .createClientManager(
- new
DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory());
-
private static final class ClusterConfigTaskExecutorHolder {
private static final ClusterConfigTaskExecutor INSTANCE = new
ClusterConfigTaskExecutor();
@@ -1987,7 +1979,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
req.setPathPatternTree(
serializePatternListToByteBuffer(deactivateTemplateStatement.getPathPatternList()));
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2079,7 +2071,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
alterSchemaTemplateStatement.getOperationType(),
alterSchemaTemplateStatement.getTemplateAlterInfo()));
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2130,7 +2122,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
req.setTemplateName(unsetSchemaTemplateStatement.getTemplateName());
req.setPath(unsetSchemaTemplateStatement.getPath().getFullPath());
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -2967,7 +2959,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
alterEncodingCompressorStatement.ifExists(),
alterEncodingCompressorStatement.isWithAudit());
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -3010,7 +3002,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
serializePatternListToByteBuffer(deleteTimeSeriesStatement.getPathPatternList()));
req.setMayDeleteAudit(deleteTimeSeriesStatement.isMayDeleteAudit());
try (ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -3052,7 +3044,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
queryId,
serializePatternListToByteBuffer(deleteLogicalViewStatement.getPathPatternList()));
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -3138,7 +3130,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new TDeleteLogicalViewReq(
queryId,
serializePatternListToByteBuffer(Collections.singletonList(oldName)));
try (ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -3210,7 +3202,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
new TAlterLogicalViewReq(
context.getQueryId().getId(),
ByteBuffer.wrap(stream.toByteArray()));
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TSStatus tsStatus;
do {
try {
@@ -3268,7 +3260,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.setIsGeneratedByPipe(shouldMarkAsPipeRequest);
TSStatus tsStatus;
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
do {
try {
tsStatus = client.alterLogicalView(req);
@@ -4422,7 +4414,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
@@ -4466,7 +4458,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus tsStatus =
sendAlterReq2ConfigNode(
@@ -4505,7 +4497,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus tsStatus =
sendAlterReq2ConfigNode(
@@ -4544,7 +4536,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
@@ -4590,7 +4582,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
@@ -4634,7 +4626,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
@@ -4676,7 +4668,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
@@ -4720,7 +4712,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
@@ -4764,7 +4756,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final boolean isView) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TSStatus tsStatus =
sendAlterReq2ConfigNode(
@@ -4799,7 +4791,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
try (final ConfigNodeClient client =
-
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final ByteArrayOutputStream patternStream = new ByteArrayOutputStream();
try (final DataOutputStream outputStream = new
DataOutputStream(patternStream)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index f2d901c1540..756f17148ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -59,7 +59,6 @@ public class TreeModelPlanner implements IPlanner {
private final Statement statement;
private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final IPartitionFetcher partitionFetcher;
@@ -75,7 +74,6 @@ public class TreeModelPlanner implements IPlanner {
public TreeModelPlanner(
Statement statement,
ExecutorService executor,
- ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
@@ -84,7 +82,6 @@ public class TreeModelPlanner implements IPlanner {
asyncInternalServiceClientManager) {
this.statement = statement;
this.executor = executor;
- this.writeOperationExecutor = writeOperationExecutor;
this.scheduledExecutor = scheduledExecutor;
this.partitionFetcher = partitionFetcher;
this.schemaFetcher = schemaFetcher;
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 87be30f4520..54d9ccaf5ce 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -494,9 +494,10 @@ cn_rpc_max_concurrent_client_num=3000
cn_connection_timeout_ms=60000
# selector thread (TAsyncClientManager) nums for async thread in a
clientManager
+# When <= 0, use max(1, CPU core number / 4).
# effectiveMode: restart
# Datatype: int
-cn_selector_thread_nums_of_client_manager=1
+cn_selector_thread_nums_of_client_manager=0
# The maximum number of clients that can be allocated for a node in a
clientManager.
# when the number of the client to a single node exceeds this number, the
thread for applying for a client will be blocked
@@ -505,6 +506,14 @@ cn_selector_thread_nums_of_client_manager=1
# Datatype: int
cn_max_client_count_for_each_node_in_client_manager=1000
+# The maximum number of idle clients that can be retained for a node in a
clientManager.
+# When the number of idle clients to a single node exceeds this number, excess
idle clients will be evicted.
+# Idle clients are determined by a time threshold (default 1 minute of
inactivity).
+# 0 means no idle clients will be retained, connections are destroyed
immediately upon return.
+# effectiveMode: restart
+# Datatype: int
+# cn_max_idle_client_count_for_each_node_in_client_manager=1000
+
# The maximum session idle time. unit: ms
# Idle sessions are the ones that performs neither query or non-query
operations for a period of time
# Set to 0 to disable session timeout
@@ -523,11 +532,6 @@ dn_rpc_thrift_compression_enable=false
# this feature is under development, set this as false before it is done.
dn_rpc_advanced_compression_enable=false
-# the number of rpc selector
-# effectiveMode: restart
-# Datatype: int
-dn_rpc_selector_thread_count=1
-
# The maximum number of concurrent clients that can be connected to the
dataNode.
# effectiveMode: restart
# Datatype: int
@@ -549,9 +553,10 @@ dn_thrift_init_buffer_size=1024
dn_connection_timeout_ms=60000
# selector thread (TAsyncClientManager) nums for async thread in a
clientManager
+# When <= 0, use max(1, CPU core number / 4).
# effectiveMode: restart
# Datatype: int
-dn_selector_thread_count_of_client_manager=1
+dn_selector_thread_nums_of_client_manager=0
# The maximum number of clients that can be allocated for a node in a
clientManager.
# When the number of the client to a single node exceeds this number, the
thread for applying for a client will be blocked
@@ -560,6 +565,14 @@ dn_selector_thread_count_of_client_manager=1
# Datatype: int
dn_max_client_count_for_each_node_in_client_manager=1000
+# The maximum number of idle clients that can be retained for a node in a
clientManager.
+# When the number of idle clients to a single node exceeds this number, excess
idle clients will be evicted.
+# Idle clients are determined by a time threshold (default 1 minute of
inactivity).
+# 0 means no idle clients will be retained, connections are destroyed
immediately upon return.
+# effectiveMode: restart
+# Datatype: int
+# dn_max_idle_client_count_for_each_node_in_client_manager=1000
+
####################
### REST Service Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index ea20f9c76cc..ce5c8366889 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -72,6 +72,12 @@ public class ClientPoolFactory {
public static class AsyncConfigNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncConfigNodeInternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncConfigNodeInternalServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncConfigNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>
manager) {
@@ -82,7 +88,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.build(),
ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncConfigNodeInternalServiceClient>()
@@ -120,6 +126,12 @@ public class ClientPoolFactory {
public static class AsyncDataNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeInternalServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
@@ -130,7 +142,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
@@ -146,6 +158,12 @@ public class ClientPoolFactory {
public static class AsyncDataNodeExternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeExternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeExternalServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeExternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeExternalServiceClient> manager) {
@@ -156,7 +174,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeExternalServiceClient>()
@@ -171,6 +189,12 @@ public class ClientPoolFactory {
public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncConfigNodeInternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncConfigNodeHeartbeatServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncConfigNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>
manager) {
@@ -182,7 +206,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
@@ -197,6 +221,13 @@ public class ClientPoolFactory {
public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
+
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeHeartbeatServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
@@ -207,7 +238,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
@@ -247,6 +278,13 @@ public class ClientPoolFactory {
public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncDataNodeMPPDataExchangeServiceClientPoolFactory(
+ int selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient>
createClientPool(
@@ -258,7 +296,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.build(),
ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
@@ -414,6 +452,12 @@ public class ClientPoolFactory {
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncAINodeInternalServiceClient> {
+ private final int selectorNumOfAsyncClientManager;
+
+ public AsyncAINodeHeartbeatServiceClientPoolFactory(int
selectorNumOfAsyncClientManager) {
+ this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager;
+ }
+
@Override
public GenericKeyedObjectPool<TEndPoint, AsyncAINodeInternalServiceClient>
createClientPool(
ClientManager<TEndPoint, AsyncAINodeInternalServiceClient> manager) {
@@ -424,7 +468,7 @@ public class ClientPoolFactory {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+
.setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager)
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index 5bb7c22ee49..1f818afe543 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.commons.client.property;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import java.time.Duration;
@@ -49,7 +51,11 @@ public class ClientPoolProperty<V> {
* the maximum number of clients that can be allocated for a node. When
some clients are idle
* for more than {@code maxIdleTimeForClient}, they will be cleaned up.
*/
- private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxClientNumForEachNode =
+
CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode();
+
+ private int maxIdleClientNumForEachNode =
+
CommonDescriptor.getInstance().getConfig().getMaxIdleClientNumForEachNode();
/**
* the minimum amount of time a client may sit idle in the pool before it
is eligible for
@@ -74,6 +80,11 @@ public class ClientPoolProperty<V> {
return this;
}
+ public Builder<V> setMaxIdleClientNumForEachNode(int
maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ return this;
+ }
+
public Builder<V> setMinIdleTimeForClient(long minIdleTimeForClient) {
this.minIdleTimeForClient = minIdleTimeForClient;
return this;
@@ -87,7 +98,7 @@ public class ClientPoolProperty<V> {
public ClientPoolProperty<V> build() {
GenericKeyedObjectPoolConfig<V> poolConfig = new
GenericKeyedObjectPoolConfig<>();
poolConfig.setMaxTotalPerKey(maxClientNumForEachNode);
- poolConfig.setMaxIdlePerKey(maxClientNumForEachNode);
+ poolConfig.setMaxIdlePerKey(maxIdleClientNumForEachNode);
poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRuns));
poolConfig.setMinEvictableIdleTime(Duration.ofMillis(minIdleTimeForClient));
poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMs));
@@ -105,5 +116,6 @@ public class ClientPoolProperty<V> {
public static final long MIN_IDLE_TIME_FOR_CLIENT_MS =
TimeUnit.MINUTES.toMillis(1);
public static final long TIME_BETWEEN_EVICTION_RUNS_MS =
TimeUnit.MINUTES.toMillis(1);
public static final int MAX_CLIENT_NUM_FOR_EACH_NODE = 1000;
+ public static final int MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE = 1000;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
index f8fe16166a8..f157ee7df93 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
@@ -117,7 +117,10 @@ public class ThriftClientProperty {
public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
public static final int CONNECTION_TIMEOUT_MS = (int)
TimeUnit.SECONDS.toMillis(20);
public static final int CONNECTION_NEVER_TIMEOUT_MS = 0;
- public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
+ public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER =
+ Runtime.getRuntime().availableProcessors() / 4 > 0
+ ? Runtime.getRuntime().availableProcessors() / 4
+ : 1;
public static final boolean PRINT_LOG_WHEN_ENCOUNTER_EXCEPTION = true;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index 8053b677c78..5c069368e41 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -53,15 +53,15 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
private static final int MAX_RETRY_NUM = 6;
- protected AsyncRequestManager() {
- initClientManager();
+ protected AsyncRequestManager(int selectorNumOfAsyncClientManager) {
+ initClientManager(selectorNumOfAsyncClientManager);
actionMapBuilder = ImmutableMap.builder();
initActionMapBuilder();
this.actionMap = this.actionMapBuilder.build();
checkActionMapCompleteness();
}
- protected abstract void initClientManager();
+ protected abstract void initClientManager(int
selectorNumOfAsyncClientManager);
protected abstract void initActionMapBuilder();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
index 791a1e5df0e..3b50c29fba6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java
@@ -28,12 +28,18 @@ import
org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClien
public abstract class ConfigNodeInternalServiceAsyncRequestManager<RequestType>
extends AsyncRequestManager<
RequestType, TConfigNodeLocation,
AsyncConfigNodeInternalServiceClient> {
+
+ protected ConfigNodeInternalServiceAsyncRequestManager(int
selectorNumOfAsyncClientManager) {
+ super(selectorNumOfAsyncClientManager);
+ }
+
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncConfigNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
index fcb1b01857d..722d4f241eb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java
@@ -28,12 +28,18 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
public abstract class DataNodeInternalServiceRequestManager<RequestType>
extends AsyncRequestManager<
RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> {
+
+ protected DataNodeInternalServiceRequestManager(int
selectorNumOfAsyncClientManager) {
+ super(selectorNumOfAsyncClientManager);
+ }
+
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
index f00de855eb8..34e44e5c8cf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
@@ -29,12 +29,17 @@ public abstract class
DataNodeIntraHeartbeatRequestManager<RequestType>
extends AsyncRequestManager<
RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> {
+ protected DataNodeIntraHeartbeatRequestManager(int
selectorNumOfAsyncClientManager) {
+ super(selectorNumOfAsyncClientManager);
+ }
+
@Override
- protected void initClientManager() {
+ protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
+ new
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory(
+ selectorNumOfAsyncClientManager));
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 20fa9d78d59..b08416bd8c6 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -45,7 +45,6 @@ public enum ThreadName {
MPP_COORDINATOR_EXECUTOR_POOL("MPP-Coordinator-Executor"),
DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"),
DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
- MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"),
ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
// -------------------------- Compaction --------------------------
COMPACTION_WORKER("Compaction-Worker"),
@@ -231,7 +230,6 @@ public enum ThreadName {
MPP_COORDINATOR_EXECUTOR_POOL,
DATANODE_INTERNAL_RPC_SERVICE,
DATANODE_INTERNAL_RPC_PROCESSOR,
- MPP_COORDINATOR_WRITE_EXECUTOR,
ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL));
private static final Set<ThreadName> compactionThreadNames =
new HashSet<>(Arrays.asList(COMPACTION_WORKER, COMPACTION_SUB_TASK,
COMPACTION_SCHEDULE));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 19313723e55..e34d5804cac 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -153,13 +153,18 @@ public class CommonConfig {
* ClientManager will have so many selector threads (TAsyncClientManager) to
distribute to its
* clients.
*/
- private int selectorNumOfClientManager = 1;
+ private int selectorNumOfClientManager =
+ Runtime.getRuntime().availableProcessors() / 4 > 0
+ ? Runtime.getRuntime().availableProcessors() / 4
+ : 1;
/** Whether to use thrift compression. */
private boolean isRpcThriftCompressionEnabled = false;
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+ private int maxIdleClientNumForEachNode =
DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;
+
/** What will the system do when unrecoverable error occurs. */
private HandleSystemErrorStrategy handleSystemErrorStrategy =
HandleSystemErrorStrategy.CHANGE_TO_READ_ONLY;
@@ -713,6 +718,14 @@ public class CommonConfig {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
+ public int getMaxIdleClientNumForEachNode() {
+ return maxIdleClientNumForEachNode;
+ }
+
+ public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
+ this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
+ }
+
HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
return handleSystemErrorStrategy;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d392a60bbbd..5cd954a09f7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -139,13 +139,16 @@ public class CommonDescriptor {
"cn_connection_timeout_ms",
String.valueOf(config.getCnConnectionTimeoutInMS()))
.trim()));
- config.setSelectorNumOfClientManager(
+ int cnSelectorNumOfClientManager =
Integer.parseInt(
properties
.getProperty(
"cn_selector_thread_nums_of_client_manager",
String.valueOf(config.getSelectorNumOfClientManager()))
- .trim()));
+ .trim());
+ if (cnSelectorNumOfClientManager > 0) {
+ config.setSelectorNumOfClientManager(cnSelectorNumOfClientManager);
+ }
config.setMaxClientNumForEachNode(
Integer.parseInt(
@@ -155,6 +158,17 @@ public class CommonDescriptor {
String.valueOf(config.getMaxClientNumForEachNode()))
.trim()));
+ int cnMaxIdleClientNumForEachNode =
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "cn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getMaxIdleClientNumForEachNode()))
+ .trim());
+ if (cnMaxIdleClientNumForEachNode >= 0) {
+ config.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode);
+ }
+
config.setDnConnectionTimeoutInMS(
Integer.parseInt(
properties
@@ -170,13 +184,16 @@ public class CommonDescriptor {
String.valueOf(config.isRpcThriftCompressionEnabled()))
.trim()));
- config.setSelectorNumOfClientManager(
+ int dnSelectorNumOfClientManager =
Integer.parseInt(
properties
.getProperty(
"dn_selector_thread_nums_of_client_manager",
String.valueOf(config.getSelectorNumOfClientManager()))
- .trim()));
+ .trim());
+ if (dnSelectorNumOfClientManager > 0) {
+ config.setSelectorNumOfClientManager(dnSelectorNumOfClientManager);
+ }
config.setMaxClientNumForEachNode(
Integer.parseInt(
@@ -186,6 +203,17 @@ public class CommonDescriptor {
String.valueOf(config.getMaxClientNumForEachNode()))
.trim()));
+ int dnMaxIdleClientNumForEachNode =
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "dn_max_idle_client_count_for_each_node_in_client_manager",
+ String.valueOf(config.getMaxIdleClientNumForEachNode()))
+ .trim());
+ if (dnMaxIdleClientNumForEachNode >= 0) {
+ config.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode);
+ }
+
config.setHandleSystemErrorStrategy(
HandleSystemErrorStrategy.valueOf(
properties
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 8b12d667f74..be7cf49239a 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -214,6 +214,7 @@ public class ClientManagerTest {
manager, new
ThriftClientProperty.Builder().build()),
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setMaxClientNumForEachNode(maxClientForEachNode)
+
.setMaxIdleClientNumForEachNode(maxClientForEachNode)
.setMinIdleTimeForClient(minIdleDuration)
.setTimeBetweenEvictionRuns(evictionRunsDuration)
.build()
@@ -294,6 +295,7 @@ public class ClientManagerTest {
manager, new
ThriftClientProperty.Builder().build()),
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setMaxClientNumForEachNode(maxTotalClientForEachNode)
+
.setMaxIdleClientNumForEachNode(maxTotalClientForEachNode)
.setWaitClientTimeoutMs(waitClientTimeoutMs)
.build()
.getConfig());
@@ -369,6 +371,7 @@ public class ClientManagerTest {
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setWaitClientTimeoutMs(waitClientTimeoutMS)
.setMaxClientNumForEachNode(maxTotalClientForEachNode)
+
.setMaxIdleClientNumForEachNode(maxTotalClientForEachNode)
.build()
.getConfig());
}
@@ -617,7 +620,13 @@ public class ClientManagerTest {
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(CONNECTION_TIMEOUT)
.build()),
- new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
+ .setMaxClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE)
+ .setMaxIdleClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE)
+ .build()
+ .getConfig());
}
}
@@ -632,7 +641,13 @@ public class ClientManagerTest {
manager,
new
ThriftClientProperty.Builder().setConnectionTimeoutMs(CONNECTION_TIMEOUT).build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
- new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
+ new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
+ .setMaxClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE)
+ .setMaxIdleClientNumForEachNode(
+
ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE)
+ .build()
+ .getConfig());
}
}
}