This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e05b262559 Add the ability for ClientManager to periodically clean up 
idle objects
9e05b262559 is described below

commit 9e05b262559e7e7418ab58499517ebbe42049717
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Jan 22 04:01:18 2024 -0600

    Add the ability for ClientManager to periodically clean up idle objects
---
 .../resources/conf/iotdb-confignode.properties     |  5 ---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    | 15 -------
 .../confignode/conf/ConfigNodeDescriptor.java      |  8 ----
 .../manager/consensus/ConsensusManager.java        |  2 -
 .../iotdb/consensus/config/IoTConsensusConfig.java | 16 --------
 .../apache/iotdb/consensus/config/RatisConfig.java | 16 --------
 .../iot/client/IoTConsensusClientPool.java         |  2 -
 .../iotdb/consensus/ratis/RatisConsensus.java      |  1 -
 .../resources/conf/iotdb-datanode.properties       |  5 ---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 -------
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 15 -------
 .../db/consensus/DataRegionConsensusImpl.java      |  2 -
 .../db/consensus/SchemaRegionConsensusImpl.java    |  1 -
 .../protocol/client/DataNodeClientPoolFactory.java |  2 -
 .../resources/conf/iotdb-common.properties         |  3 --
 .../iotdb/commons/client/ClientPoolFactory.java    | 29 ++-----------
 .../client/property/ClientPoolProperty.java        | 35 ++++++++++++----
 .../apache/iotdb/commons/conf/CommonConfig.java    | 18 ---------
 .../iotdb/commons/conf/CommonDescriptor.java       | 23 -----------
 .../iotdb/commons/pipe/config/PipeConfig.java      |  5 ---
 .../iotdb/commons/client/ClientManagerTest.java    | 47 +++++++++++++++++-----
 21 files changed, 66 insertions(+), 198 deletions(-)

diff --git 
a/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties 
b/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 96dbbd876e3..204623b8253 100644
--- 
a/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ 
b/iotdb-core/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -105,11 +105,6 @@ cn_seed_config_node=127.0.0.1:10710
 # Datatype: int
 # cn_selector_thread_nums_of_client_manager=1
 
-# The maximum number of clients that can be idle for a node in a clientManager.
-# When the number of idle clients on a node exceeds this number, newly 
returned clients will be released
-# Datatype: int
-# cn_core_client_count_for_each_node_in_client_manager=200
-
 # The maximum number of clients that can be allocated for a node in a 
clientManager.
 # when the number of the client to a single node exceeds this number, the 
thread for applying for a client will be blocked
 # for a while, then ClientManager will throw ClientManagerException if there 
are no clients after the block time.
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 96af2b0e44a..608910104f1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -125,12 +125,6 @@ public class ConfigNodeConfig {
   /** just for test wait for 60 second by default. */
   private int thriftServerAwaitTimeForStopService = 60;
 
-  /**
-   * The maximum number of clients that can be idle for a node in a 
clientManager. When the number
-   * of idle clients on a node exceeds this number, newly returned clients 
will be released.
-   */
-  private int coreClientNumForEachNode = 
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
   /**
    * The maximum number of clients that can be allocated for a node in a 
clientManager. When the
    * number of the client to a single node exceeds this number, the thread for 
applying for a client
@@ -451,15 +445,6 @@ public class ConfigNodeConfig {
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
   }
 
-  public int getCoreClientNumForEachNode() {
-    return coreClientNumForEachNode;
-  }
-
-  public ConfigNodeConfig setCoreClientNumForEachNode(int 
coreClientNumForEachNode) {
-    this.coreClientNumForEachNode = coreClientNumForEachNode;
-    return this;
-  }
-
   public int getMaxClientNumForEachNode() {
     return maxClientNumForEachNode;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index a1fbb95cd6d..5a6f7ca7697 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -316,14 +316,6 @@ public class ConfigNodeDescriptor {
                     "cn_thrift_max_frame_size", 
String.valueOf(conf.getCnThriftMaxFrameSize()))
                 .trim()));
 
-    conf.setCoreClientNumForEachNode(
-        Integer.parseInt(
-            properties
-                .getProperty(
-                    "cn_core_client_count_for_each_node_in_client_manager",
-                    String.valueOf(conf.getCoreClientNumForEachNode()))
-                .trim()));
-
     conf.setMaxClientNumForEachNode(
         Integer.parseInt(
             properties
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 1d82735e899..69647bf7488 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -181,8 +181,6 @@ public class ConsensusManager {
                                           
CONF.getConfigNodeRatisInitialSleepTimeMs())
                                       .setClientRetryMaxSleepTimeMs(
                                           
CONF.getConfigNodeRatisMaxSleepTimeMs())
-                                      .setCoreClientNumForEachNode(
-                                          CONF.getCoreClientNumForEachNode())
                                       
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
                                       .build())
                               .setImpl(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 91b4fb262ac..f4bb131b3bf 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -80,7 +80,6 @@ public class IoTConsensusConfig {
 
     private final boolean printLogWhenThriftClientEncounterException;
     private final int thriftMaxFrameSize;
-    private final int coreClientNumForEachNode;
     private final int maxClientNumForEachNode;
 
     private RPC(
@@ -93,7 +92,6 @@ public class IoTConsensusConfig {
         int connectionTimeoutInMs,
         boolean printLogWhenThriftClientEncounterException,
         int thriftMaxFrameSize,
-        int coreClientNumForEachNode,
         int maxClientNumForEachNode) {
       this.rpcSelectorThreadNum = rpcSelectorThreadNum;
       this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
@@ -104,7 +102,6 @@ public class IoTConsensusConfig {
       this.connectionTimeoutInMs = connectionTimeoutInMs;
       this.printLogWhenThriftClientEncounterException = 
printLogWhenThriftClientEncounterException;
       this.thriftMaxFrameSize = thriftMaxFrameSize;
-      this.coreClientNumForEachNode = coreClientNumForEachNode;
       this.maxClientNumForEachNode = maxClientNumForEachNode;
     }
 
@@ -144,10 +141,6 @@ public class IoTConsensusConfig {
       return thriftMaxFrameSize;
     }
 
-    public int getCoreClientNumForEachNode() {
-      return coreClientNumForEachNode;
-    }
-
     public int getMaxClientNumForEachNode() {
       return maxClientNumForEachNode;
     }
@@ -168,9 +161,6 @@ public class IoTConsensusConfig {
 
       private boolean printLogWhenThriftClientEncounterException = true;
       private int thriftMaxFrameSize = 536870912;
-
-      private int coreClientNumForEachNode = 
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
       private int maxClientNumForEachNode = 
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
 
       public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
@@ -221,11 +211,6 @@ public class IoTConsensusConfig {
         return this;
       }
 
-      public RPC.Builder setCoreClientNumForEachNode(int 
coreClientNumForEachNode) {
-        this.coreClientNumForEachNode = coreClientNumForEachNode;
-        return this;
-      }
-
       public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
         this.maxClientNumForEachNode = maxClientNumForEachNode;
         return this;
@@ -242,7 +227,6 @@ public class IoTConsensusConfig {
             connectionTimeoutInMs,
             printLogWhenThriftClientEncounterException,
             thriftMaxFrameSize,
-            coreClientNumForEachNode,
             maxClientNumForEachNode);
       }
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index 0e9cffea6d3..9b43bc956bd 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -821,7 +821,6 @@ public class RatisConfig {
     private final int clientMaxRetryAttempt;
     private final long clientRetryInitialSleepTimeMs;
     private final long clientRetryMaxSleepTimeMs;
-    private final int coreClientNumForEachNode;
     private final int maxClientNumForEachNode;
 
     public Client(
@@ -829,13 +828,11 @@ public class RatisConfig {
         int clientMaxRetryAttempt,
         long clientRetryInitialSleepTimeMs,
         long clientRetryMaxSleepTimeMs,
-        int coreClientNumForEachNode,
         int maxClientNumForEachNode) {
       this.clientRequestTimeoutMillis = clientRequestTimeoutMillis;
       this.clientMaxRetryAttempt = clientMaxRetryAttempt;
       this.clientRetryInitialSleepTimeMs = clientRetryInitialSleepTimeMs;
       this.clientRetryMaxSleepTimeMs = clientRetryMaxSleepTimeMs;
-      this.coreClientNumForEachNode = coreClientNumForEachNode;
       this.maxClientNumForEachNode = maxClientNumForEachNode;
     }
 
@@ -855,10 +852,6 @@ public class RatisConfig {
       return clientRetryMaxSleepTimeMs;
     }
 
-    public int getCoreClientNumForEachNode() {
-      return coreClientNumForEachNode;
-    }
-
     public int getMaxClientNumForEachNode() {
       return maxClientNumForEachNode;
     }
@@ -873,9 +866,6 @@ public class RatisConfig {
       private int clientMaxRetryAttempt = 10;
       private long clientRetryInitialSleepTimeMs = 100;
       private long clientRetryMaxSleepTimeMs = 10000;
-
-      private int coreClientNumForEachNode = 
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
       private int maxClientNumForEachNode = 
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
 
       public Client build() {
@@ -884,7 +874,6 @@ public class RatisConfig {
             clientMaxRetryAttempt,
             clientRetryInitialSleepTimeMs,
             clientRetryMaxSleepTimeMs,
-            coreClientNumForEachNode,
             maxClientNumForEachNode);
       }
 
@@ -908,11 +897,6 @@ public class RatisConfig {
         return this;
       }
 
-      public Builder setCoreClientNumForEachNode(int coreClientNumForEachNode) 
{
-        this.coreClientNumForEachNode = coreClientNumForEachNode;
-        return this;
-      }
-
       public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
         this.maxClientNumForEachNode = maxClientNumForEachNode;
         return this;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index f013e218772..4f6c182ab1d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -65,7 +65,6 @@ public class IoTConsensusClientPool {
                           
config.getRpc().isPrintLogWhenThriftClientEncounterException())
                       .build()),
               new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>()
-                  
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
                   
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
@@ -105,7 +104,6 @@ public class IoTConsensusClientPool {
                       .build(),
                   
ThreadName.ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL.getName()),
               new ClientPoolProperty.Builder<AsyncIoTConsensusServiceClient>()
-                  
.setCoreClientNumForEachNode(config.getRpc().getCoreClientNumForEachNode())
                   
.setMaxClientNumForEachNode(config.getRpc().getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index b3cddf00493..950af3658e1 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -796,7 +796,6 @@ class RatisConsensus implements IConsensus {
           new GenericKeyedObjectPool<>(
               new RatisClient.Factory(manager, properties, clientRpc, 
config.getClient()),
               new ClientPoolProperty.Builder<RatisClient>()
-                  
.setCoreClientNumForEachNode(config.getClient().getCoreClientNumForEachNode())
                   
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
diff --git 
a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties 
b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
index cfe94dc550a..e529154cbba 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
@@ -132,11 +132,6 @@ dn_seed_config_node=127.0.0.1:10710
 # Datatype: int
 # dn_selector_thread_count_of_client_manager=1
 
-# The maximum number of clients that can be idle for a node in a clientManager.
-# When the number of idle clients on a node exceeds this number, newly 
returned clients will be released
-# Datatype: int
-# dn_core_client_count_for_each_node_in_client_manager=200
-
 # The maximum number of clients that can be allocated for a node in a 
clientManager.
 # When the number of the client to a single node exceeds this number, the 
thread for applying for a client will be blocked
 # for a while, then ClientManager will throw ClientManagerException if there 
are no clients after the block time.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0fe486f1d8c..3b6bac840a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -932,12 +932,6 @@ public class IoTDBConfig {
           ? Runtime.getRuntime().availableProcessors() / 4
           : 1;
 
-  /**
-   * The maximum number of clients that can be idle for a node in a 
clientManager. When the number
-   * of idle clients on a node exceeds this number, newly returned clients 
will be released
-   */
-  private int coreClientNumForEachNode = 
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
-
   /**
    * The maximum number of clients that can be allocated for a node in a 
clientManager. When the
    * number of the client to a single node exceeds this number, the thread for 
applying for a client
@@ -3043,14 +3037,6 @@ public class IoTDBConfig {
     this.maxClientNumForEachNode = maxClientNumForEachNode;
   }
 
-  public int getCoreClientNumForEachNode() {
-    return coreClientNumForEachNode;
-  }
-
-  public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
-    this.coreClientNumForEachNode = coreClientNumForEachNode;
-  }
-
   public int getSelectorNumOfClientManager() {
     return selectorNumOfClientManager;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f6e56fd60c9..39fb8f51a29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -241,21 +241,6 @@ public class IoTDBDescriptor {
                     "dn_connection_timeout_ms", 
String.valueOf(conf.getConnectionTimeoutInMS()))
                 .trim()));
 
-    if (properties.getProperty("dn_core_connection_for_internal_service", 
null) != null) {
-      conf.setCoreClientNumForEachNode(
-          Integer.parseInt(
-              
properties.getProperty("dn_core_connection_for_internal_service").trim()));
-      LOGGER.warn(
-          "The parameter dn_core_connection_for_internal_service is out of 
date. Please rename it to 
dn_core_client_count_for_each_node_in_client_manager.");
-    }
-    conf.setCoreClientNumForEachNode(
-        Integer.parseInt(
-            properties
-                .getProperty(
-                    "dn_core_client_count_for_each_node_in_client_manager",
-                    String.valueOf(conf.getCoreClientNumForEachNode()))
-                .trim()));
-
     if (properties.getProperty("dn_max_connection_for_internal_service", null) 
!= null) {
       conf.setMaxClientNumForEachNode(
           Integer.parseInt(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index dcdd8bea4a4..21cbb97eea2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -86,7 +86,6 @@ public class DataRegionConsensusImpl {
                                     .setThriftServerAwaitTimeForStopService(
                                         
CONF.getThriftServerAwaitTimeForStopService())
                                     
.setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
-                                    
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
                                     
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
                                     .build())
                             .setReplication(
@@ -167,7 +166,6 @@ public class DataRegionConsensusImpl {
                                         
CONF.getDataRatisConsensusInitialSleepTimeMs())
                                     .setClientRetryMaxSleepTimeMs(
                                         
CONF.getDataRatisConsensusMaxSleepTimeMs())
-                                    
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
                                     
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
                                     .build())
                             .setImpl(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index a2f4d95bc48..1bf53889b5f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -128,7 +128,6 @@ public class SchemaRegionConsensusImpl {
                                         
CONF.getDataRatisConsensusInitialSleepTimeMs())
                                     .setClientRetryMaxSleepTimeMs(
                                         
CONF.getDataRatisConsensusMaxSleepTimeMs())
-                                    
.setCoreClientNumForEachNode(CONF.getCoreClientNumForEachNode())
                                     
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
                                     .build())
                             .setImpl(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
index 26e6a001139..03300e82e3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
@@ -54,7 +54,6 @@ public class DataNodeClientPoolFactory {
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
                       .build()),
               new ClientPoolProperty.Builder<ConfigNodeClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
                   
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
@@ -83,7 +82,6 @@ public class DataNodeClientPoolFactory {
                               : 1)
                       .build()),
               new ClientPoolProperty.Builder<ConfigNodeClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
                   
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 6d8d6782304..162af1852a0 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -955,9 +955,6 @@ data_replication_factor=1
 # Recommend to set this value to less than or equal to 
pipe_sink_max_client_number.
 # pipe_sink_selector_number=4
 
-# The core number of clients that can be used in the sink.
-# pipe_sink_core_client_number=8
-
 # The maximum number of clients that can be used in the sink.
 # pipe_sink_max_client_number=16
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 077906332f0..44ef3894989 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -56,11 +56,7 @@ public class ClientPoolFactory {
                       .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                       .build()),
-              new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                  .build()
-                  .getConfig());
+              new 
ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
       ClientManagerMetrics.getInstance()
           .registerClientManager(this.getClass().getSimpleName(), clientPool);
       return clientPool;
@@ -83,11 +79,7 @@ public class ClientPoolFactory {
                       
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                       .build(),
                   ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
-              new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                  .build()
-                  .getConfig());
+              new 
ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>().build().getConfig());
       ClientManagerMetrics.getInstance()
           .registerClientManager(this.getClass().getSimpleName(), clientPool);
       return clientPool;
@@ -109,8 +101,6 @@ public class ClientPoolFactory {
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                       .build()),
               new 
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -136,8 +126,6 @@ public class ClientPoolFactory {
                       .build(),
                   ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -164,11 +152,7 @@ public class ClientPoolFactory {
                       .setPrintLogWhenEncounterException(false)
                       .build(),
                   ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
-              new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
-                  .build()
-                  .getConfig());
+              new 
ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>().build().getConfig());
       ClientManagerMetrics.getInstance()
           .registerClientManager(this.getClass().getSimpleName(), clientPool);
       return clientPool;
@@ -192,8 +176,6 @@ public class ClientPoolFactory {
                       .build(),
                   ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -217,8 +199,6 @@ public class ClientPoolFactory {
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                       .build()),
               new 
ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -244,8 +224,6 @@ public class ClientPoolFactory {
                       .build(),
                   
ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getCoreClientNumForEachNode())
-                  
.setMaxClientNumForEachNode(conf.getMaxClientNumForEachNode())
                   .build()
                   .getConfig());
       ClientManagerMetrics.getInstance()
@@ -273,7 +251,6 @@ public class ClientPoolFactory {
                       .build(),
                   ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
-                  
.setCoreClientNumForEachNode(conf.getPipeAsyncConnectorCoreClientNumber())
                   
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxClientNumber())
                   .build()
                   .getConfig());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index 907609658df..c4cd930ec00 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -45,13 +45,24 @@ public class ClientPoolProperty<V> {
      */
     private long waitClientTimeoutMs = DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
 
-    /** the maximum number of clients that can be allocated for a node. */
+    /**
+     * the maximum number of clients that can be allocated for a node. When 
some clients are idle
+     * for more than {@code maxIdleTimeForClient}, they will be cleaned up.
+     */
     private int maxClientNumForEachNode = 
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+
     /**
-     * the maximum number of clients that can be idle for a node. When the 
number of idle clients on
-     * a node exceeds this number, newly returned clients will be released.
+     * the minimum amount of time a client may sit idle in the pool before it 
is eligible for
+     * eviction by the idle object evictor.
      */
-    private int coreClientNumForEachNode = 
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+    private long minIdleTimeForClient = 
DefaultProperty.MIN_IDLE_TIME_FOR_CLIENT_MS;
+
+    /**
+     * the duration to sleep between runs of the idle object evictor thread. 
When non-positive, no
+     * idle object evictor thread will be run, which means clients that are 
idle for more than
+     * {@code minIdleTimeForClient} will never be cleaned up.
+     */
+    private long timeBetweenEvictionRuns = 
DefaultProperty.TIME_BETWEEN_EVICTION_RUNS_MS;
 
     public Builder<V> setWaitClientTimeoutMs(long waitClientTimeoutMs) {
       this.waitClientTimeoutMs = waitClientTimeoutMs;
@@ -63,15 +74,22 @@ public class ClientPoolProperty<V> {
       return this;
     }
 
-    public Builder<V> setCoreClientNumForEachNode(int 
coreClientNumForEachNode) {
-      this.coreClientNumForEachNode = coreClientNumForEachNode;
+    public Builder<V> setMinIdleTimeForClient(long minIdleTimeForClient) {
+      this.minIdleTimeForClient = minIdleTimeForClient;
+      return this;
+    }
+
+    public Builder<V> setTimeBetweenEvictionRuns(long timeBetweenEvictionRuns) 
{
+      this.timeBetweenEvictionRuns = timeBetweenEvictionRuns;
       return this;
     }
 
     public ClientPoolProperty<V> build() {
       GenericKeyedObjectPoolConfig<V> poolConfig = new 
GenericKeyedObjectPoolConfig<>();
       poolConfig.setMaxTotalPerKey(maxClientNumForEachNode);
-      poolConfig.setMaxIdlePerKey(coreClientNumForEachNode);
+      poolConfig.setMaxIdlePerKey(maxClientNumForEachNode);
+      
poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRuns));
+      
poolConfig.setMinEvictableIdleTime(Duration.ofMillis(minIdleTimeForClient));
       poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMs));
       poolConfig.setTestOnReturn(true);
       poolConfig.setTestOnBorrow(true);
@@ -84,7 +102,8 @@ public class ClientPoolProperty<V> {
     private DefaultProperty() {}
 
     public static final long WAIT_CLIENT_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(10);
+    public static final long MIN_IDLE_TIME_FOR_CLIENT_MS = 
TimeUnit.MINUTES.toMillis(1);
+    public static final long TIME_BETWEEN_EVICTION_RUNS_MS = 
TimeUnit.MINUTES.toMillis(1);
     public static final int MAX_CLIENT_NUM_FOR_EACH_NODE = 300;
-    public static final int CORE_CLIENT_NUM_FOR_EACH_NODE = 200;
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4aacf87a386..a461452d807 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -108,7 +108,6 @@ public class CommonConfig {
   /** Whether to use thrift compression. */
   private boolean isRpcThriftCompressionEnabled = false;
 
-  private int coreClientNumForEachNode = 
DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
   private int maxClientNumForEachNode = 
DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
 
   /** What will the system do when unrecoverable error occurs. */
@@ -172,7 +171,6 @@ public class CommonConfig {
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
   private int pipeAsyncConnectorSelectorNumber = 4;
-  private int pipeAsyncConnectorCoreClientNumber = 8;
   private int pipeAsyncConnectorMaxClientNumber = 16;
 
   private boolean isSeperatedPipeHeartbeatEnabled = true;
@@ -381,14 +379,6 @@ public class CommonConfig {
     this.maxClientNumForEachNode = maxClientNumForEachNode;
   }
 
-  public int getCoreClientNumForEachNode() {
-    return coreClientNumForEachNode;
-  }
-
-  public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
-    this.coreClientNumForEachNode = coreClientNumForEachNode;
-  }
-
   HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
     return handleSystemErrorStrategy;
   }
@@ -604,14 +594,6 @@ public class CommonConfig {
     this.pipeAsyncConnectorSelectorNumber = pipeAsyncConnectorSelectorNumber;
   }
 
-  public int getPipeAsyncConnectorCoreClientNumber() {
-    return pipeAsyncConnectorCoreClientNumber;
-  }
-
-  public void setPipeAsyncConnectorCoreClientNumber(int 
pipeAsyncConnectorCoreClientNumber) {
-    this.pipeAsyncConnectorCoreClientNumber = 
pipeAsyncConnectorCoreClientNumber;
-  }
-
   public int getPipeAsyncConnectorMaxClientNumber() {
     return pipeAsyncConnectorMaxClientNumber;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d767cbbf38f..ab51e7b6428 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -134,14 +134,6 @@ public class CommonDescriptor {
                     String.valueOf(config.getSelectorNumOfClientManager()))
                 .trim()));
 
-    config.setCoreClientNumForEachNode(
-        Integer.parseInt(
-            properties
-                .getProperty(
-                    "cn_core_client_count_for_each_node_in_client_manager",
-                    String.valueOf(config.getCoreClientNumForEachNode()))
-                .trim()));
-
     config.setMaxClientNumForEachNode(
         Integer.parseInt(
             properties
@@ -173,14 +165,6 @@ public class CommonDescriptor {
                     String.valueOf(config.getSelectorNumOfClientManager()))
                 .trim()));
 
-    config.setCoreClientNumForEachNode(
-        Integer.parseInt(
-            properties
-                .getProperty(
-                    "dn_core_client_count_for_each_node_in_client_manager",
-                    String.valueOf(config.getCoreClientNumForEachNode()))
-                .trim()));
-
     config.setMaxClientNumForEachNode(
         Integer.parseInt(
             properties
@@ -384,13 +368,6 @@ public class CommonDescriptor {
                     properties.getProperty(
                         "pipe_async_connector_selector_number",
                         
String.valueOf(config.getPipeAsyncConnectorSelectorNumber())))));
-    config.setPipeAsyncConnectorCoreClientNumber(
-        Integer.parseInt(
-            
Optional.ofNullable(properties.getProperty("pipe_sink_core_client_number"))
-                .orElse(
-                    properties.getProperty(
-                        "pipe_async_connector_core_client_number",
-                        
String.valueOf(config.getPipeAsyncConnectorCoreClientNumber())))));
     config.setPipeAsyncConnectorMaxClientNumber(
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_max_client_number"))
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 792cd65e9f3..32fb5b0fdbb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -119,10 +119,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncConnectorSelectorNumber();
   }
 
-  public int getPipeAsyncConnectorCoreClientNumber() {
-    return COMMON_CONFIG.getPipeAsyncConnectorCoreClientNumber();
-  }
-
   public int getPipeAsyncConnectorMaxClientNumber() {
     return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
   }
@@ -247,7 +243,6 @@ public class PipeConfig {
         isPipeConnectorRPCThriftCompressionEnabled());
 
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
-    LOGGER.info("PipeAsyncConnectorCoreClientNumber: {}", 
getPipeAsyncConnectorCoreClientNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
 
     LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 431ad50ebe6..81a439dd5f3 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -42,6 +42,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -96,7 +98,7 @@ public class ClientManagerTest {
   public void allTest() throws Exception {
     normalSyncTest();
     normalAsyncTest();
-    maxIdleTest();
+    evictionTest();
     maxTotalTest();
     maxWaitClientTimeoutTest();
     invalidSyncClientReturnTest();
@@ -190,10 +192,13 @@ public class ClientManagerTest {
     Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
   }
 
-  public void maxIdleTest() throws Exception {
-    int maxIdleClientForEachNode = 1;
+  public void evictionTest() throws Exception {
+    List<SyncDataNodeInternalServiceClient> evictionTestClients = new 
ArrayList<>();
+    int maxClientForEachNode = 2;
+    long minIdleDuration = TimeUnit.SECONDS.toMillis(10);
+    long evictionRunsDuration = TimeUnit.SECONDS.toMillis(2);
 
-    // init syncClientManager and set maxIdleClientForEachNode to 1
+    // init syncClientManager and set minIdleDuation and evictionRunsDuration
     ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncClusterManager =
         (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
             new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
@@ -207,7 +212,9 @@ public class ClientManagerTest {
                             new SyncDataNodeInternalServiceClient.Factory(
                                 manager, new 
ThriftClientProperty.Builder().build()),
                             new 
ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
-                                
.setCoreClientNumForEachNode(maxIdleClientForEachNode)
+                                
.setMaxClientNumForEachNode(maxClientForEachNode)
+                                .setMinIdleTimeForClient(minIdleDuration)
+                                
.setTimeBetweenEvictionRuns(evictionRunsDuration)
                                 .build()
                                 .getConfig());
                       }
@@ -215,6 +222,7 @@ public class ClientManagerTest {
 
     // get one sync client
     SyncDataNodeInternalServiceClient syncClient1 = 
syncClusterManager.borrowClient(endPoint);
+    evictionTestClients.add(syncClient1);
     Assert.assertNotNull(syncClient1);
     Assert.assertEquals(syncClient1.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient1.getClientManager(), syncClusterManager);
@@ -224,6 +232,7 @@ public class ClientManagerTest {
 
     // get another sync client
     SyncDataNodeInternalServiceClient syncClient2 = 
syncClusterManager.borrowClient(endPoint);
+    evictionTestClients.add(syncClient2);
     Assert.assertNotNull(syncClient2);
     Assert.assertEquals(syncClient2.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient2.getClientManager(), syncClusterManager);
@@ -236,17 +245,33 @@ public class ClientManagerTest {
     Assert.assertEquals(1, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
 
-    // return another sync client, clientManager should destroy this client
+    // return another sync client
     syncClient2.close();
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
-    Assert.assertEquals(1, syncClusterManager.getPool().getNumIdle(endPoint));
-    Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
+    Assert.assertEquals(2, syncClusterManager.getPool().getNumIdle(endPoint));
 
-    // close syncClientManager, syncClientManager should destroy all client
-    syncClusterManager.close();
+    long start = System.currentTimeMillis();
+    while (syncClusterManager.getPool().getNumIdle() > 0
+        || (syncClient1.getInputProtocol().getTransport().isOpen()
+            || syncClient2.getInputProtocol().getTransport().isOpen())) {
+      for (SyncDataNodeInternalServiceClient evictionTestClient : 
evictionTestClients) {
+        // if this client is evicted, skip it
+        if (!evictionTestClient.getInputProtocol().getTransport().isOpen()) 
continue;
+        // test eviction
+        long current = System.currentTimeMillis();
+        // for each idle client, its theoretical max idle time is 
`minIdleDuration` +
+        // `evictionRunsDuration`. Taking into account the difference in 
thread scheduling rates of
+        // different machines, here we multiply by 6
+        if ((current - start) > (minIdleDuration + evictionRunsDuration) * 6) {
+          Assert.fail("Evict invalid client failed");
+        }
+      }
+      Thread.sleep(100);
+    }
+    // since the two clients are idle for more than 10s, which exceeds 
`minIdleDuration`, they
+    // should be destroyed.
     Assert.assertEquals(0, 
syncClusterManager.getPool().getNumActive(endPoint));
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
-    Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
   }
 
   public void maxTotalTest() throws Exception {


Reply via email to