This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9e05b262559 Add the ability for ClientManager to periodically clean up
idle objects
9e05b262559 is described below
commit 9e05b262559e7e7418ab58499517ebbe42049717
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Jan 22 04:01:18 2024 -0600
Add the ability for ClientManager to periodically clean up idle objects
---
.../resources/conf/iotdb-confignode.properties | 5 ---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 15 -------
.../confignode/conf/ConfigNodeDescriptor.java | 8 ----
.../manager/consensus/ConsensusManager.java | 2 -
.../iotdb/consensus/config/IoTConsensusConfig.java | 16 --------
.../apache/iotdb/consensus/config/RatisConfig.java | 16 --------
.../iot/client/IoTConsensusClientPool.java | 2 -
.../iotdb/consensus/ratis/RatisConsensus.java | 1 -
.../resources/conf/iotdb-datanode.properties | 5 ---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 -------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 15 -------
.../db/consensus/DataRegionConsensusImpl.java | 2 -
.../db/consensus/SchemaRegionConsensusImpl.java | 1 -
.../protocol/client/DataNodeClientPoolFactory.java | 2 -
.../resources/conf/iotdb-common.properties | 3 --
.../iotdb/commons/client/ClientPoolFactory.java | 29 ++-----------
.../client/property/ClientPoolProperty.java | 35 ++++++++++++----
.../apache/iotdb/commons/conf/CommonConfig.java | 18 ---------
.../iotdb/commons/conf/CommonDescriptor.java | 23 -----------
.../iotdb/commons/pipe/config/PipeConfig.java | 5 ---
.../iotdb/commons/client/ClientManagerTest.java | 47 +++++++++++++++++-----
21 files changed, 66 insertions(+), 198 deletions(-)
diff --git
a/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties
b/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 96dbbd876e3..204623b8253 100644
---
a/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++
b/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -105,11 +105,6 @@ cn_seed_config_node=127.0.0.1:10710
# Datatype: int
# cn_selector_thread_nums_of_client_manager=1
-# The maximum number of clients that can be idle for a node in a clientManager.
-# When the number of idle clients on a node exceeds this number, newly
returned clients will be released
-# Datatype: int
-# cn_core_client_count_for_each_node_in_client_manager=200
-
# The maximum number of clients that can be allocated for a node in a
clientManager.
# when the number of the client to a single node exceeds this number, the
thread for applying for a client will be blocked
# for a while, then ClientManager will throw ClientManagerException if there
are no clients after the block time.
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 96af2b0e44a..608910104f1 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -125,12 +125,6 @@ public class ConfigNodeConfig {
/** just for test wait for 60 second by default. */
private int thriftServerAwaitTimeForStopService = 60;
- /**
- * The maximum number of clients that can be idle for a node in a
clientManager. When the number
- * of idle clients on a node exceeds this number, newly returned clients
will be released.
- */
- private int coreClientNumForEachNode =
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
/**
* The maximum number of clients that can be allocated for a node in a
clientManager. When the
* number of the client to a single node exceeds this number, the thread for
applying for a client
@@ -451,15 +445,6 @@ public class ConfigNodeConfig {
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
}
- public int getCoreClientNumForEachNode() {
- return coreClientNumForEachNode;
- }
-
- public ConfigNodeConfig setCoreClientNumForEachNode(int
coreClientNumForEachNode) {
- this.coreClientNumForEachNode = coreClientNumForEachNode;
- return this;
- }
-
public int getMaxClientNumForEachNode() {
return maxClientNumForEachNode;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index a1fbb95cd6d..5a6f7ca7697 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -316,14 +316,6 @@ public class ConfigNodeDescriptor {
"cn_thrift_max_frame_size",
String.valueOf(conf.getCnThriftMaxFrameSize()))
.trim()));
- conf.setCoreClientNumForEachNode(
- Integer.parseInt(
- properties
- .getProperty(
- "cn_core_client_count_for_each_node_in_client_manager",
- String.valueOf(conf.getCoreClientNumForEachNode()))
- .trim()));
-
conf.setMaxClientNumForEachNode(
Integer.parseInt(
properties
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 1d82735e899..69647bf7488 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -181,8 +181,6 @@ public class ConsensusManager {
CONF.getConfigNodeRatisInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
CONF.getConfigNodeRatisMaxSleepTimeMs())
- .setCoreClientNumForEachNode(
- CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setImpl(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 91b4fb262ac..f4bb131b3bf 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -80,7 +80,6 @@ public class IoTConsensusConfig {
private final boolean printLogWhenThriftClientEncounterException;
private final int thriftMaxFrameSize;
- private final int coreClientNumForEachNode;
private final int maxClientNumForEachNode;
private RPC(
@@ -93,7 +92,6 @@ public class IoTConsensusConfig {
int connectionTimeoutInMs,
boolean printLogWhenThriftClientEncounterException,
int thriftMaxFrameSize,
- int coreClientNumForEachNode,
int maxClientNumForEachNode) {
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
@@ -104,7 +102,6 @@ public class IoTConsensusConfig {
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.printLogWhenThriftClientEncounterException =
printLogWhenThriftClientEncounterException;
this.thriftMaxFrameSize = thriftMaxFrameSize;
- this.coreClientNumForEachNode = coreClientNumForEachNode;
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
@@ -144,10 +141,6 @@ public class IoTConsensusConfig {
return thriftMaxFrameSize;
}
- public int getCoreClientNumForEachNode() {
- return coreClientNumForEachNode;
- }
-
public int getMaxClientNumForEachNode() {
return maxClientNumForEachNode;
}
@@ -168,9 +161,6 @@ public class IoTConsensusConfig {
private boolean printLogWhenThriftClientEncounterException = true;
private int thriftMaxFrameSize = 536870912;
-
- private int coreClientNumForEachNode =
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
@@ -221,11 +211,6 @@ public class IoTConsensusConfig {
return this;
}
- public RPC.Builder setCoreClientNumForEachNode(int
coreClientNumForEachNode) {
- this.coreClientNumForEachNode = coreClientNumForEachNode;
- return this;
- }
-
public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
this.maxClientNumForEachNode = maxClientNumForEachNode;
return this;
@@ -242,7 +227,6 @@ public class IoTConsensusConfig {
connectionTimeoutInMs,
printLogWhenThriftClientEncounterException,
thriftMaxFrameSize,
- coreClientNumForEachNode,
maxClientNumForEachNode);
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index 0e9cffea6d3..9b43bc956bd 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -821,7 +821,6 @@ public class RatisConfig {
private final int clientMaxRetryAttempt;
private final long clientRetryInitialSleepTimeMs;
private final long clientRetryMaxSleepTimeMs;
- private final int coreClientNumForEachNode;
private final int maxClientNumForEachNode;
public Client(
@@ -829,13 +828,11 @@ public class RatisConfig {
int clientMaxRetryAttempt,
long clientRetryInitialSleepTimeMs,
long clientRetryMaxSleepTimeMs,
- int coreClientNumForEachNode,
int maxClientNumForEachNode) {
this.clientRequestTimeoutMillis = clientRequestTimeoutMillis;
this.clientMaxRetryAttempt = clientMaxRetryAttempt;
this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs;
this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs;
- this.coreClientNumForEachNode = coreClientNumForEachNode;
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
@@ -855,10 +852,6 @@ public class RatisConfig {
return clientRetryMaxSleepTimeMs;
}
- public int getCoreClientNumForEachNode() {
- return coreClientNumForEachNode;
- }
-
public int getMaxClientNumForEachNode() {
return maxClientNumForEachNode;
}
@@ -873,9 +866,6 @@ public class RatisConfig {
private int clientMaxRetryAttempt = 10;
private long clientRetryInitialSleepTimeMs = 100;
private long clientRetryMaxSleepTimeMs = 10000;
-
- private int coreClientNumForEachNode =
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
public Client build() {
@@ -884,7 +874,6 @@ public class RatisConfig {
clientMaxRetryAttempt,
clientRetryInitialSleepTimeMs,
clientRetryMaxSleepTimeMs,
- coreClientNumForEachNode,
maxClientNumForEachNode);
}
@@ -908,11 +897,6 @@ public class RatisConfig {
return this;
}
- public Builder setCoreClientNumForEachNode(int coreClientNumForEachNode)
{
- this.coreClientNumForEachNode = coreClientNumForEachNode;
- return this;
- }
-
public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
this.maxClientNumForEachNode = maxClientNumForEachNode;
return this;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index f013e218772..4f6c182ab1d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -65,7 +65,6 @@ public class IoTConsensusClientPool {
config.getRpc().isPrintLogWhenThriftClientEncounterException())
.build()),
new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>()
-
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
@@ -105,7 +104,6 @@ public class IoTConsensusClientPool {
.build(),
ThreadName.ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
-
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
.build()
.getConfig());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index b3cddf00493..950af3658e1 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -796,7 +796,6 @@ class RatisConsensus implements IConsensus {
new GenericKeyedObjectPool<>(
new RatisClient.Factory(manager, properties, clientRpc,
config.getClient()),
new ClientPoolProperty.Builder<RatisClient>()
-
.setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
.build()
.getConfig());
diff --git
a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
index cfe94dc550a..e529154cbba 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
@@ -132,11 +132,6 @@ dn_seed_config_node=127.0.0.1:10710
# Datatype: int
# dn_selector_thread_count_of_client_manager=1
-# The maximum number of clients that can be idle for a node in a clientManager.
-# When the number of idle clients on a node exceeds this number, newly
returned clients will be released
-# Datatype: int
-# dn_core_client_count_for_each_node_in_client_manager=200
-
# The maximum number of clients that can be allocated for a node in a
clientManager.
# When the number of the client to a single node exceeds this number, the
thread for applying for a client will be blocked
# for a while, then ClientManager will throw ClientManagerException if there
are no clients after the block time.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0fe486f1d8c..3b6bac840a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -932,12 +932,6 @@ public class IoTDBConfig {
? Runtime.getRuntime().availableProcessors() / 4
: 1;
- /**
- * The maximum number of clients that can be idle for a node in a
clientManager. When the number
- * of idle clients on a node exceeds this number, newly returned clients
will be released
- */
- private int coreClientNumForEachNode =
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
/**
* The maximum number of clients that can be allocated for a node in a
clientManager. When the
* number of the client to a single node exceeds this number, the thread for
applying for a client
@@ -3043,14 +3037,6 @@ public class IoTDBConfig {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
- public int getCoreClientNumForEachNode() {
- return coreClientNumForEachNode;
- }
-
- public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
- this.coreClientNumForEachNode = coreClientNumForEachNode;
- }
-
public int getSelectorNumOfClientManager() {
return selectorNumOfClientManager;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f6e56fd60c9..39fb8f51a29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -241,21 +241,6 @@ public class IoTDBDescriptor {
"dn_connection_timeout_ms",
String.valueOf(conf.getConnectionTimeoutInMS()))
.trim()));
- if (properties.getProperty("dn_core_connection_for_internal_service",
null) != null) {
- conf.setCoreClientNumForEachNode(
- Integer.parseInt(
-
properties.getProperty("dn_core_connection_for_internal_service").trim()));
- LOGGER.warn(
- "The parameter dn_core_connection_for_internal_service is out of
date. Please rename it to
dn_core_client_count_for_each_node_in_client_manager.");
- }
- conf.setCoreClientNumForEachNode(
- Integer.parseInt(
- properties
- .getProperty(
- "dn_core_client_count_for_each_node_in_client_manager",
- String.valueOf(conf.getCoreClientNumForEachNode()))
- .trim()));
-
if (properties.getProperty("dn_max_connection_for_internal_service", null)
!= null) {
conf.setMaxClientNumForEachNode(
Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index dcdd8bea4a4..21cbb97eea2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -86,7 +86,6 @@ public class DataRegionConsensusImpl {
.setThriftServerAwaitTimeForStopService(
CONF.getThriftServerAwaitTimeForStopService())
.setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
-
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setReplication(
@@ -167,7 +166,6 @@ public class DataRegionConsensusImpl {
CONF.getDataRatisConsensusInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
CONF.getDataRatisConsensusMaxSleepTimeMs())
-
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setImpl(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index a2f4d95bc48..1bf53889b5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -128,7 +128,6 @@ public class SchemaRegionConsensusImpl {
CONF.getDataRatisConsensusInitialSleepTimeMs())
.setClientRetryMaxSleepTimeMs(
CONF.getDataRatisConsensusMaxSleepTimeMs())
-
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build())
.setImpl(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
index 26e6a001139..03300e82e3a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
@@ -54,7 +54,6 @@ public class DataNodeClientPoolFactory {
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
@@ -83,7 +82,6 @@ public class DataNodeClientPoolFactory {
: 1)
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 6d8d6782304..162af1852a0 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -955,9 +955,6 @@ data_replication_factor=1
# Recommend to set this value to less than or equal to
pipe_sink_max_client_number.
# pipe_sink_selector_number=4
-# The core number of clients that can be used in the sink.
-# pipe_sink_core_client_number=8
-
# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 077906332f0..44ef3894989 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -56,11 +56,7 @@ public class ClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
- new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build()
- .getConfig());
+ new
ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
@@ -83,11 +79,7 @@ public class ClientPoolFactory {
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build()
- .getConfig());
+ new
ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>().build().getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
@@ -109,8 +101,6 @@ public class ClientPoolFactory {
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -136,8 +126,6 @@ public class ClientPoolFactory {
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -164,11 +152,7 @@ public class ClientPoolFactory {
.setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
- .build()
- .getConfig());
+ new
ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>().build().getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
@@ -192,8 +176,6 @@ public class ClientPoolFactory {
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -217,8 +199,6 @@ public class ClientPoolFactory {
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new
ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -244,8 +224,6 @@ public class ClientPoolFactory {
.build(),
ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
-
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
@@ -273,7 +251,6 @@ public class ClientPoolFactory {
.build(),
ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-
.setCoreClientNumForEachNode(conf.getPipeAsyncConnectorCoreClientNumber())
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
.build()
.getConfig());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index 907609658df..c4cd930ec00 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -45,13 +45,24 @@ public class ClientPoolProperty<V> {
*/
private long waitClientTimeoutMs = DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
- /** the maximum number of clients that can be allocated for a node. */
+ /**
+ * the maximum number of clients that can be allocated for a node. When
some clients are idle
+ * for more than {@code maxIdleTimeForClient}, they will be cleaned up.
+ */
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+
/**
- * the maximum number of clients that can be idle for a node. When the
number of idle clients on
- * a node exceeds this number, newly returned clients will be released.
+ * the minimum amount of time a client may sit idle in the pool before it
is eligible for
+ * eviction by the idle object evictor.
*/
- private int coreClientNumForEachNode =
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+ private long minIdleTimeForClient =
DefaultProperty.MIN_IDLE_TIME_FOR_CLIENT_MS;
+
+ /**
+ * the duration to sleep between runs of the idle object evictor thread.
When non-positive, no
+ * idle object evictor thread will be run, which means clients that are
idle for more than
+ * {@code minIdleTimeForClient} will never be cleaned up.
+ */
+ private long timeBetweenEvictionRuns =
DefaultProperty.TIME_BETWEEN_EVICTION_RUNS_MS;
public Builder<V> setWaitClientTimeoutMs(long waitClientTimeoutMs) {
this.waitClientTimeoutMs = waitClientTimeoutMs;
@@ -63,15 +74,22 @@ public class ClientPoolProperty<V> {
return this;
}
- public Builder<V> setCoreClientNumForEachNode(int
coreClientNumForEachNode) {
- this.coreClientNumForEachNode = coreClientNumForEachNode;
+ public Builder<V> setMinIdleTimeForClient(long minIdleTimeForClient) {
+ this.minIdleTimeForClient = minIdleTimeForClient;
+ return this;
+ }
+
+ public Builder<V> setTimeBetweenEvictionRuns(long timeBetweenEvictionRuns)
{
+ this.timeBetweenEvictionRuns = timeBetweenEvictionRuns;
return this;
}
public ClientPoolProperty<V> build() {
GenericKeyedObjectPoolConfig<V> poolConfig = new
GenericKeyedObjectPoolConfig<>();
poolConfig.setMaxTotalPerKey(maxClientNumForEachNode);
- poolConfig.setMaxIdlePerKey(coreClientNumForEachNode);
+ poolConfig.setMaxIdlePerKey(maxClientNumForEachNode);
+
poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRuns));
+
poolConfig.setMinEvictableIdleTime(Duration.ofMillis(minIdleTimeForClient));
poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMs));
poolConfig.setTestOnReturn(true);
poolConfig.setTestOnBorrow(true);
@@ -84,7 +102,8 @@ public class ClientPoolProperty<V> {
private DefaultProperty() {}
public static final long WAIT_CLIENT_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(10);
+ public static final long MIN_IDLE_TIME_FOR_CLIENT_MS =
TimeUnit.MINUTES.toMillis(1);
+ public static final long TIME_BETWEEN_EVICTION_RUNS_MS =
TimeUnit.MINUTES.toMillis(1);
public static final int MAX_CLIENT_NUM_FOR_EACH_NODE = 300;
- public static final int CORE_CLIENT_NUM_FOR_EACH_NODE = 200;
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4aacf87a386..a461452d807 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -108,7 +108,6 @@ public class CommonConfig {
/** Whether to use thrift compression. */
private boolean isRpcThriftCompressionEnabled = false;
- private int coreClientNumForEachNode =
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
private int maxClientNumForEachNode =
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
/** What will the system do when unrecoverable error occurs. */
@@ -172,7 +171,6 @@ public class CommonConfig {
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
private int pipeAsyncConnectorSelectorNumber = 4;
- private int pipeAsyncConnectorCoreClientNumber = 8;
private int pipeAsyncConnectorMaxClientNumber = 16;
private boolean isSeperatedPipeHeartbeatEnabled = true;
@@ -381,14 +379,6 @@ public class CommonConfig {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}
- public int getCoreClientNumForEachNode() {
- return coreClientNumForEachNode;
- }
-
- public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
- this.coreClientNumForEachNode = coreClientNumForEachNode;
- }
-
HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
return handleSystemErrorStrategy;
}
@@ -604,14 +594,6 @@ public class CommonConfig {
this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
}
- public int getPipeAsyncConnectorCoreClientNumber() {
- return pipeAsyncConnectorCoreClientNumber;
- }
-
- public void setPipeAsyncConnectorCoreClientNumber(int
pipeAsyncConnectorCoreClientNumber) {
- this.pipeAsyncConnectorCoreClientNumber =
pipeAsyncConnectorCoreClientNumber;
- }
-
public int getPipeAsyncConnectorMaxClientNumber() {
return pipeAsyncConnectorMaxClientNumber;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d767cbbf38f..ab51e7b6428 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -134,14 +134,6 @@ public class CommonDescriptor {
String.valueOf(config.getSelectorNumOfClientManager()))
.trim()));
- config.setCoreClientNumForEachNode(
- Integer.parseInt(
- properties
- .getProperty(
- "cn_core_client_count_for_each_node_in_client_manager",
- String.valueOf(config.getCoreClientNumForEachNode()))
- .trim()));
-
config.setMaxClientNumForEachNode(
Integer.parseInt(
properties
@@ -173,14 +165,6 @@ public class CommonDescriptor {
String.valueOf(config.getSelectorNumOfClientManager()))
.trim()));
- config.setCoreClientNumForEachNode(
- Integer.parseInt(
- properties
- .getProperty(
- "dn_core_client_count_for_each_node_in_client_manager",
- String.valueOf(config.getCoreClientNumForEachNode()))
- .trim()));
-
config.setMaxClientNumForEachNode(
Integer.parseInt(
properties
@@ -384,13 +368,6 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_async_connector_selector_number",
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
- config.setPipeAsyncConnectorCoreClientNumber(
- Integer.parseInt(
-
Optional.ofNullable(properties.getProperty("pipe_sink_core_client_number"))
- .orElse(
- properties.getProperty(
- "pipe_async_connector_core_client_number",
-
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber())))));
config.setPipeAsyncConnectorMaxClientNumber(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 792cd65e9f3..32fb5b0fdbb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -119,10 +119,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
}
- public int getPipeAsyncConnectorCoreClientNumber() {
- return COMMON_CONFIG.getPipeAsyncConnectorCoreClientNumber();
- }
-
public int getPipeAsyncConnectorMaxClientNumber() {
return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
}
@@ -247,7 +243,6 @@ public class PipeConfig {
isPipeConnectorRPCThriftCompressionEnabled());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
- LOGGER.info("PipeAsyncConnectorCoreClientNumber: {}",
getPipeAsyncConnectorCoreClientNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
LOGGER.info("SeperatedPipeHeartbeatEnabled: {}",
isSeperatedPipeHeartbeatEnabled());
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 431ad50ebe6..81a439dd5f3 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -42,6 +42,8 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -96,7 +98,7 @@ public class ClientManagerTest {
public void allTest() throws Exception {
normalSyncTest();
normalAsyncTest();
- maxIdleTest();
+ evictionTest();
maxTotalTest();
maxWaitClientTimeoutTest();
invalidSyncClientReturnTest();
@@ -190,10 +192,13 @@ public class ClientManagerTest {
Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
}
- public void maxIdleTest() throws Exception {
- int maxIdleClientForEachNode = 1;
+ public void evictionTest() throws Exception {
+ List<SyncDataNodeInternalServiceClient> evictionTestClients = new
ArrayList<>();
+ int maxClientForEachNode = 2;
+ long minIdleDuration = TimeUnit.SECONDS.toMillis(10);
+ long evictionRunsDuration = TimeUnit.SECONDS.toMillis(2);
- // init syncClientManager and set maxIdleClientForEachNode to 1
+ // init syncClientManager and set minIdleDuation and evictionRunsDuration
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncClusterManager =
(ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
@@ -207,7 +212,9 @@ public class ClientManagerTest {
new SyncDataNodeInternalServiceClient.Factory(
manager, new
ThriftClientProperty.Builder().build()),
new
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
-
.setCoreClientNumForEachNode(maxIdleClientForEachNode)
+
.setMaxClientNumForEachNode(maxClientForEachNode)
+ .setMinIdleTimeForClient(minIdleDuration)
+
.setTimeBetweenEvictionRuns(evictionRunsDuration)
.build()
.getConfig());
}
@@ -215,6 +222,7 @@ public class ClientManagerTest {
// get one sync client
SyncDataNodeInternalServiceClient syncClient1 =
syncClusterManager.borrowClient(endPoint);
+ evictionTestClients.add(syncClient1);
Assert.assertNotNull(syncClient1);
Assert.assertEquals(syncClient1.getTEndpoint(), endPoint);
Assert.assertEquals(syncClient1.getClientManager(), syncClusterManager);
@@ -224,6 +232,7 @@ public class ClientManagerTest {
// get another sync client
SyncDataNodeInternalServiceClient syncClient2 =
syncClusterManager.borrowClient(endPoint);
+ evictionTestClients.add(syncClient2);
Assert.assertNotNull(syncClient2);
Assert.assertEquals(syncClient2.getTEndpoint(), endPoint);
Assert.assertEquals(syncClient2.getClientManager(), syncClusterManager);
@@ -236,17 +245,33 @@ public class ClientManagerTest {
Assert.assertEquals(1,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
- // return another sync client, clientManager should destroy this client
+ // return another sync client
syncClient2.close();
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
- Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
- Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
+ Assert.assertEquals(2, syncClusterManager.getPool().getNumIdle(endPoint));
- // close syncClientManager, syncClientManager should destroy all client
- syncClusterManager.close();
+ long start = System.currentTimeMillis();
+ while (syncClusterManager.getPool().getNumIdle() > 0
+ || (syncClient1.getInputProtocol().getTransport().isOpen()
+ || syncClient2.getInputProtocol().getTransport().isOpen())) {
+ for (SyncDataNodeInternalServiceClient evictionTestClient :
evictionTestClients) {
+ // if this client is evicted, skip it
+ if (!evictionTestClient.getInputProtocol().getTransport().isOpen())
continue;
+ // test eviction
+ long current = System.currentTimeMillis();
+ // for each idle client, its theoretical max idle time is
`minIdleDuration` +
+ // `evictionRunsDuration`. Taking into account the difference in
thread scheduling rates of
+ // different machines, here we multiply by 6
+ if ((current - start) > (minIdleDuration + evictionRunsDuration) * 6) {
+ Assert.fail("Evict invalid client failed");
+ }
+ }
+ Thread.sleep(100);
+ }
+ // since the two clients are idle for more than 10s, which exceeds
`minIdleDuration`, they
+ // should be destroyed.
Assert.assertEquals(0,
syncClusterManager.getPool().getNumActive(endPoint));
Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
- Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
}
public void maxTotalTest() throws Exception {