This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 95d06f6de16 Fix AsyncClientManager selector thread leak #14203
95d06f6de16 is described below
commit 95d06f6de16013f4bb19cda3b73d4545fc411d8f
Author: Potato <[email protected]>
AuthorDate: Tue Nov 26 17:32:20 2024 +0800
Fix AsyncClientManager selector thread leak #14203
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../iot/client/IoTConsensusClientPool.java | 5 ++--
.../iotdb/consensus/ratis/RatisConsensus.java | 3 +-
.../protocol/client/DataNodeClientPoolFactory.java | 5 ++--
.../apache/iotdb/commons/client/ClientManager.java | 11 +++++--
.../iotdb/commons/client/ClientPoolFactory.java | 35 +++++++++++-----------
.../iotdb/commons/client/IClientPoolFactory.java | 4 +--
.../client/factory/AsyncThriftClientFactory.java | 6 ++++
.../iotdb/commons/client/ClientManagerTest.java | 11 ++++---
8 files changed, 44 insertions(+), 36 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index 4f6c182ab1d..307ef6eb803 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.commons.client.property.ThriftClientProperty.DefaultProp
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
-import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
public class IoTConsensusClientPool {
@@ -48,7 +47,7 @@ public class IoTConsensusClientPool {
}
@Override
- public KeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient>
createClientPool(
ClientManager<TEndPoint, SyncIoTConsensusServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -84,7 +83,7 @@ public class IoTConsensusClientPool {
}
@Override
- public KeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient>
createClientPool(
ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, AsyncIoTConsensusServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 4a293fb3fd4..9b4c3274e6a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -53,7 +53,6 @@ import org.apache.iotdb.consensus.ratis.utils.RetryPolicy;
import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.Parameters;
@@ -980,7 +979,7 @@ class RatisConsensus implements IConsensus {
}
@Override
- public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
+ public GenericKeyedObjectPool<RaftGroup, RatisClient> createClientPool(
ClientManager<RaftGroup, RatisClient> manager) {
GenericKeyedObjectPool<RaftGroup, RatisClient> clientPool =
new GenericKeyedObjectPool<>(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
index 03300e82e3a..b5f5df43012 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
public class DataNodeClientPoolFactory {
@@ -43,7 +42,7 @@ public class DataNodeClientPoolFactory {
implements IClientPoolFactory<ConfigRegionId, ConfigNodeClient> {
@Override
- public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool(
+ public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient>
createClientPool(
ClientManager<ConfigRegionId, ConfigNodeClient> manager) {
GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool =
new GenericKeyedObjectPool<>(
@@ -67,7 +66,7 @@ public class DataNodeClientPoolFactory {
implements IClientPoolFactory<ConfigRegionId, ConfigNodeClient> {
@Override
- public KeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool(
+ public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient>
createClientPool(
ClientManager<ConfigRegionId, ConfigNodeClient> manager) {
GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool =
new GenericKeyedObjectPool<>(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index f04569d752a..56bc67b6399 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.commons.client;
import
org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,14 +34,14 @@ public class ClientManager<K, V> implements
IClientManager<K, V> {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientManager.class);
- private final KeyedObjectPool<K, V> pool;
+ private final GenericKeyedObjectPool<K, V> pool;
ClientManager(IClientPoolFactory<K, V> factory) {
pool = factory.createClientPool(this);
}
@TestOnly
- public KeyedObjectPool<K, V> getPool() {
+ public GenericKeyedObjectPool<K, V> getPool() {
return pool;
}
@@ -90,5 +91,9 @@ public class ClientManager<K, V> implements IClientManager<K,
V> {
@Override
public void close() {
pool.close();
+ // we need to release tManagers for AsyncThriftClientFactory
+ if (pool.getFactory() instanceof AsyncThriftClientFactory) {
+ ((AsyncThriftClientFactory<K, V>) pool.getFactory()).close();
+ }
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index c59416d1eaa..05e67ae7fa1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
public class ClientPoolFactory {
@@ -53,7 +52,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient>
createClientPool(
ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -74,7 +73,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
AsyncConfigNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
AsyncConfigNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>
manager) {
GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -99,7 +98,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
SyncDataNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -122,7 +121,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -147,7 +146,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeExternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncDataNodeExternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeExternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeExternalServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, AsyncDataNodeExternalServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -172,7 +171,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
AsyncConfigNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
AsyncConfigNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeInternalServiceClient>
manager) {
GenericKeyedObjectPool<TEndPoint, AsyncConfigNodeInternalServiceClient>
clientPool =
@@ -198,7 +197,7 @@ public class ClientPoolFactory {
public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -224,8 +223,9 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
SyncDataNodeMPPDataExchangeServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint,
SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
- ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
manager) {
+ public GenericKeyedObjectPool<TEndPoint,
SyncDataNodeMPPDataExchangeServiceClient>
+ createClientPool(
+ ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
manager) {
GenericKeyedObjectPool<TEndPoint,
SyncDataNodeMPPDataExchangeServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new SyncDataNodeMPPDataExchangeServiceClient.Factory(
@@ -247,8 +247,9 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
- ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient>
manager) {
+ public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient>
+ createClientPool(
+ ClientManager<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeMPPDataExchangeServiceClient> clientPool =
new GenericKeyedObjectPool<>(
new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
@@ -272,7 +273,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint,
AsyncPipeDataTransferServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncPipeDataTransferServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
AsyncPipeDataTransferServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
final GenericKeyedObjectPool<TEndPoint,
AsyncPipeDataTransferServiceClient> clientPool =
new GenericKeyedObjectPool<>(
@@ -299,7 +300,7 @@ public class ClientPoolFactory {
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncAINodeServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient>
createClientPool(
ClientManager<TEndPoint, AsyncAINodeServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, AsyncAINodeServiceClient> clientPool =
new GenericKeyedObjectPool<>(
@@ -326,7 +327,7 @@ public class ClientPoolFactory {
implements IClientPoolFactory<TEndPoint, AINodeClient> {
@Override
- public KeyedObjectPool<TEndPoint, AINodeClient> createClientPool(
+ public GenericKeyedObjectPool<TEndPoint, AINodeClient> createClientPool(
ClientManager<TEndPoint, AINodeClient> manager) {
GenericKeyedObjectPool<TEndPoint, AINodeClient> clientPool =
new GenericKeyedObjectPool<>(
@@ -356,7 +357,7 @@ public class ClientPoolFactory {
}
@Override
- public KeyedObjectPool<TEndPoint, SyncPipeConsensusServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint, SyncPipeConsensusServiceClient>
createClientPool(
ClientManager<TEndPoint, SyncPipeConsensusServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, SyncPipeConsensusServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
@@ -389,7 +390,7 @@ public class ClientPoolFactory {
}
@Override
- public KeyedObjectPool<TEndPoint, AsyncPipeConsensusServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint, AsyncPipeConsensusServiceClient>
createClientPool(
ClientManager<TEndPoint, AsyncPipeConsensusServiceClient> manager) {
GenericKeyedObjectPool<TEndPoint, AsyncPipeConsensusServiceClient>
clientPool =
new GenericKeyedObjectPool<>(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
index 5382cf71904..ccaea14e8fb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.commons.client;
-import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
public interface IClientPoolFactory<K, V> {
@@ -30,5 +30,5 @@ public interface IClientPoolFactory<K, V> {
* @param manager the reference to the clientManager
* @return A concurrency safe object pool
*/
- KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
+ GenericKeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
index afde802f4c4..81d16bf84da 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
@@ -54,4 +54,10 @@ public abstract class AsyncThriftClientFactory<K, V> extends
ThriftClientFactory
.collect(Collectors.toList())
.forEach(thread -> thread.setName(threadName + "-selector-" +
thread.getId()));
}
+
+ public void close() {
+ for (TAsyncClientManager tManager : tManagers) {
+ tManager.stop();
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 48bf893979e..8b12d667f74 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.schema.cache.CacheClearOptions;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
-import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -207,7 +206,7 @@ public class ClientManagerTest {
.createClientManager(
new TestSyncDataNodeInternalServiceClientPoolFactory() {
@Override
- public KeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient>
+ public GenericKeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient>
createClientPool(
ClientManager<TEndPoint,
SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
@@ -287,7 +286,7 @@ public class ClientManagerTest {
.createClientManager(
new TestSyncDataNodeInternalServiceClientPoolFactory() {
@Override
- public KeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient>
+ public GenericKeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient>
createClientPool(
ClientManager<TEndPoint,
SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
@@ -361,7 +360,7 @@ public class ClientManagerTest {
.createClientManager(
new TestSyncDataNodeInternalServiceClientPoolFactory() {
@Override
- public KeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient>
+ public GenericKeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient>
createClientPool(
ClientManager<TEndPoint,
SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
@@ -610,7 +609,7 @@ public class ClientManagerTest {
implements IClientPoolFactory<TEndPoint,
SyncDataNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
SyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
@@ -626,7 +625,7 @@ public class ClientManagerTest {
implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
@Override
- public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient>
createClientPool(
+ public GenericKeyedObjectPool<TEndPoint,
AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(