This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch fix_asyncclientmanager_selector_thread_leak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3ffb96480ed14e82cc5ad689c79d3e289269a4f1 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Tue Nov 26 15:52:47 2024 +0800 fix Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../iot/client/IoTConsensusClientPool.java | 5 ++-- .../iotdb/consensus/ratis/RatisConsensus.java | 3 +- .../protocol/client/DataNodeClientPoolFactory.java | 5 ++-- .../apache/iotdb/commons/client/ClientManager.java | 11 +++++-- .../iotdb/commons/client/ClientPoolFactory.java | 35 +++++++++++----------- .../iotdb/commons/client/IClientPoolFactory.java | 4 +-- .../client/factory/AsyncThriftClientFactory.java | 6 ++++ .../iotdb/commons/client/ClientManagerTest.java | 11 ++++--- 8 files changed, 44 insertions(+), 36 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java index 4f6c182ab1d..307ef6eb803 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java @@ -29,7 +29,6 @@ import org.apache.iotdb.commons.client.property.ThriftClientProperty.DefaultProp import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.consensus.config.IoTConsensusConfig; -import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; public class IoTConsensusClientPool { @@ -48,7 +47,7 @@ public class IoTConsensusClientPool { } @Override - public KeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> createClientPool( ClientManager<TEndPoint, SyncIoTConsensusServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -84,7 +83,7 @@ public class IoTConsensusClientPool { } @Override - public KeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient> createClientPool( ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient> clientPool = new GenericKeyedObjectPool<>( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 4a293fb3fd4..9b4c3274e6a 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -53,7 +53,6 @@ import org.apache.iotdb.consensus.ratis.utils.RetryPolicy; import org.apache.iotdb.consensus.ratis.utils.Utils; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.conf.Parameters; @@ -980,7 +979,7 @@ class RatisConsensus implements IConsensus { } @Override - public KeyedObjectPool<RaftGroup, RatisClient> createClientPool( + public GenericKeyedObjectPool<RaftGroup, RatisClient> createClientPool( ClientManager<RaftGroup, RatisClient> manager) { GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool = new GenericKeyedObjectPool<>( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java index 03300e82e3a..b5f5df43012 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java @@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.ConfigRegionId; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; public class DataNodeClientPoolFactory { @@ -43,7 +42,7 @@ public class DataNodeClientPoolFactory { implements IClientPoolFactory<ConfigRegionId, ConfigNodeClient> { @Override - public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool( + public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool( ClientManager<ConfigRegionId, ConfigNodeClient> manager) { GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool = new GenericKeyedObjectPool<>( @@ -67,7 +66,7 @@ public class DataNodeClientPoolFactory { implements IClientPoolFactory<ConfigRegionId, ConfigNodeClient> { @Override - public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool( + public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool( ClientManager<ConfigRegionId, ConfigNodeClient> manager) { GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool = new GenericKeyedObjectPool<>( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index f04569d752a..56bc67b6399 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -21,9 +21,10 @@ package org.apache.iotdb.commons.client; import org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException; import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.commons.pool2.KeyedObjectPool; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,14 +34,14 @@ public class ClientManager<K, V> implements IClientManager<K, V> { private static final Logger LOGGER = LoggerFactory.getLogger(ClientManager.class); - private final KeyedObjectPool<K, V> pool; + private final GenericKeyedObjectPool<K, V> pool; ClientManager(IClientPoolFactory<K, V> factory) { pool = factory.createClientPool(this); } @TestOnly - public KeyedObjectPool<K, V> getPool() { + public GenericKeyedObjectPool<K, V> getPool() { return pool; } @@ -90,5 +91,9 @@ public class ClientManager<K, V> implements IClientManager<K, V> { @Override public void close() { pool.close(); + // we need to release tManagers for AsyncThriftClientFactory + if (pool.getFactory() instanceof AsyncThriftClientFactory) { + ((AsyncThriftClientFactory<K, V>) pool.getFactory()).close(); + } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index c59416d1eaa..05e67ae7fa1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -40,7 +40,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; public class ClientPoolFactory { @@ -53,7 +52,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> { @Override - public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool( ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -74,7 +73,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, AsyncConfigNodeInternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -99,7 +98,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -122,7 +121,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -147,7 +146,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, AsyncDataNodeExternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncDataNodeExternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncDataNodeExternalServiceClient> createClientPool( ClientManager<TEndPoint, AsyncDataNodeExternalServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncDataNodeExternalServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -172,7 +171,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, AsyncConfigNodeInternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient> clientPool = @@ -198,7 +197,7 @@ public class ClientPoolFactory { public static class AsyncDataNodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -224,8 +223,9 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> { @Override - public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool( - ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) { + public GenericKeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> + createClientPool( + ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientPool = new GenericKeyedObjectPool<>( new SyncDataNodeMPPDataExchangeServiceClient.Factory( @@ -247,8 +247,9 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool( - ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) { + public GenericKeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> + createClientPool( + ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientPool = new GenericKeyedObjectPool<>( new AsyncDataNodeMPPDataExchangeServiceClient.Factory( @@ -272,7 +273,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, AsyncPipeDataTransferServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> createClientPool( ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) { final GenericKeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -299,7 +300,7 @@ public class ClientPoolFactory { public static class AsyncAINodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> createClientPool( ClientManager<TEndPoint, AsyncAINodeServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -326,7 +327,7 @@ public class ClientPoolFactory { implements IClientPoolFactory<TEndPoint, AINodeClient> { @Override - public KeyedObjectPool<TEndPoint, AINodeClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AINodeClient> createClientPool( ClientManager<TEndPoint, AINodeClient> manager) { GenericKeyedObjectPool<TEndPoint, AINodeClient> clientPool = new GenericKeyedObjectPool<>( @@ -356,7 +357,7 @@ public class ClientPoolFactory { } @Override - public KeyedObjectPool<TEndPoint, SyncPipeConsensusServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, SyncPipeConsensusServiceClient> createClientPool( ClientManager<TEndPoint, SyncPipeConsensusServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, SyncPipeConsensusServiceClient> clientPool = new GenericKeyedObjectPool<>( @@ -389,7 +390,7 @@ public class ClientPoolFactory { } @Override - public KeyedObjectPool<TEndPoint, AsyncPipeConsensusServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncPipeConsensusServiceClient> createClientPool( ClientManager<TEndPoint, AsyncPipeConsensusServiceClient> manager) { GenericKeyedObjectPool<TEndPoint, AsyncPipeConsensusServiceClient> clientPool = new GenericKeyedObjectPool<>( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java index 5382cf71904..ccaea14e8fb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java @@ -19,7 +19,7 @@ package org.apache.iotdb.commons.client; -import org.apache.commons.pool2.KeyedObjectPool; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; public interface IClientPoolFactory<K, V> { @@ -30,5 +30,5 @@ public interface IClientPoolFactory<K, V> { * @param manager the reference to the clientManager * @return A concurrency safe object pool */ - KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager); + GenericKeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java index afde802f4c4..81d16bf84da 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java @@ -54,4 +54,10 @@ public abstract class AsyncThriftClientFactory<K, V> extends ThriftClientFactory .collect(Collectors.toList()) .forEach(thread -> thread.setName(threadName + "-selector-" + thread.getId())); } + + public void close() { + for (TAsyncClientManager tManager : tManagers) { + tManager.stop(); + } + } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java index 48bf893979e..8b12d667f74 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java @@ -33,7 +33,6 @@ import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.schema.cache.CacheClearOptions; import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService; -import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; @@ -207,7 +206,7 @@ public class ClientManagerTest { .createClientManager( new TestSyncDataNodeInternalServiceClientPoolFactory() { @Override - public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> + public GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) { return new GenericKeyedObjectPool<>( @@ -287,7 +286,7 @@ public class ClientManagerTest { .createClientManager( new TestSyncDataNodeInternalServiceClientPoolFactory() { @Override - public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> + public GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) { return new GenericKeyedObjectPool<>( @@ -361,7 +360,7 @@ public class ClientManagerTest { .createClientManager( new TestSyncDataNodeInternalServiceClientPoolFactory() { @Override - public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> + public GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) { return new GenericKeyedObjectPool<>( @@ -610,7 +609,7 @@ public class ClientManagerTest { implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) { return new GenericKeyedObjectPool<>( new SyncDataNodeInternalServiceClient.Factory( @@ -626,7 +625,7 @@ public class ClientManagerTest { implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> { @Override - public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool( + public GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool( ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) { return new GenericKeyedObjectPool<>( new AsyncDataNodeInternalServiceClient.Factory(
