This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch client_manager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 85f4bba214677dfa61bdec81a1e666a0531d7c1e
Author: LebronAl <[email protected]>
AuthorDate: Tue Apr 19 14:49:45 2022 +0800

    init
---
 .../iotdb/confignode/client/ClientPoolFactory.java |  68 +++++++++++
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  27 ++++-
 node-commons/pom.xml                               |  10 +-
 .../commons/client/AsyncBaseClientFactory.java     |  47 ++++++++
 .../iotdb/commons/client/BaseClientFactory.java    |  41 +++++++
 .../apache/iotdb/commons/client/ClientManager.java |  71 ++++++++++++
 .../commons/client/ClientManagerProperty.java      | 123 ++++++++++++++++++++
 .../iotdb/commons/client/IClientManager.java       |  32 ++++++
 .../iotdb/commons/client/IClientPoolFactory.java   |  30 +++++
 .../async/AsyncDataNodeInternalServiceClient.java  | 126 +++++++++++++++++++++
 .../sync/SyncDataNodeInternalServiceClient.java    | 116 +++++++++++++++++++
 .../apache/iotdb/db/client/ClientPoolFactory.java  | 101 +++++++++++++++++
 .../async/AsyncConfigNodeIServiceClient.java       | 125 ++++++++++++++++++++
 .../async/AsyncDataNodeDataBlockServiceClient.java | 125 ++++++++++++++++++++
 .../client/sync/SyncConfigNodeIServiceClient.java  | 114 +++++++++++++++++++
 .../sync/SyncDataNodeDataBlockServiceClient.java   | 115 +++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  24 ++++
 17 files changed, 1290 insertions(+), 5 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
new file mode 100644
index 0000000000..73bb856065
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
+import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+
+public class ClientPoolFactory {
+
+  private static final ConfigNodeConf conf = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public static class SyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> 
createClientPool(
+        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
+          new 
ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new SyncDataNodeInternalServiceClient.Factory(manager, property), 
property.getConfig());
+    }
+  }
+
+  public static class AsyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> 
createClientPool(
+        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
+          new 
ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+              
.setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new AsyncDataNodeInternalServiceClient.Factory(manager, property), 
property.getConfig());
+    }
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 27a60ea35b..9a9b6aa336 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 public class ConfigNodeConf {
 
@@ -32,10 +33,12 @@ public class ConfigNodeConf {
 
   /** used for communication between data node and config node */
   private int rpcPort = 22277;
-
   /** used for communication between config node and config node */
   private int internalPort = 22278;
 
+  /** Thrift socket and connection timeout between data node and config node */
+  private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
+
   /** ConfigNode consensus protocol */
   private String configNodeConsensusProtocolClass =
       "org.apache.iotdb.consensus.ratis.RatisConsensus";
@@ -46,6 +49,15 @@ public class ConfigNodeConf {
   private Endpoint[] configNodeGroupAddressList =
       Collections.singletonList(new Endpoint("0.0.0.0", 22278)).toArray(new 
Endpoint[0]);
 
+  /**
+   * ClientPool will have so many selector threads (TAsyncClientManager) to 
distribute to its
+   * clients.
+   */
+  private final int selectorNumOfClientPool =
+      Runtime.getRuntime().availableProcessors() / 4 > 0
+          ? Runtime.getRuntime().availableProcessors() / 4
+          : 1;
+
   /** Number of SeriesPartitionSlots per StorageGroup */
   private int seriesPartitionSlotNum = 10000;
 
@@ -141,6 +153,10 @@ public class ConfigNodeConf {
     this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
   }
 
+  public int getSelectorNumOfClientPool() {
+    return selectorNumOfClientPool;
+  }
+
   public long getTimePartitionInterval() {
     return timePartitionInterval;
   }
@@ -213,6 +229,15 @@ public class ConfigNodeConf {
     this.internalPort = internalPort;
   }
 
+  public int getConnectionTimeoutInMS() {
+    return connectionTimeoutInMS;
+  }
+
+  public ConfigNodeConf setConnectionTimeoutInMS(int connectionTimeoutInMS) {
+    this.connectionTimeoutInMS = connectionTimeoutInMS;
+    return this;
+  }
+
   public String getConsensusDir() {
     return consensusDir;
   }
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index 3997cc6c00..f4e3587e38 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -39,7 +39,6 @@
         <dependency>
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
-            <version>${thrift.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
@@ -47,9 +46,12 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>[${guava.version},)</version>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
         </dependency>
     </dependencies>
     <build>
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
new file mode 100644
index 0000000000..b710f5482c
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.thrift.async.TAsyncClientManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AsyncBaseClientFactory<K, T> extends 
BaseClientFactory<K, T> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AsyncBaseClientFactory.class);
+  protected TAsyncClientManager[] tManagers;
+  protected AtomicInteger clientCnt = new AtomicInteger();
+
+  protected AsyncBaseClientFactory(
+      ClientManager<T> clientManager, ClientManagerProperty<T> 
clientManagerProperty) {
+    super(clientManager, clientManagerProperty);
+    tManagers = new 
TAsyncClientManager[clientManagerProperty.getSelectorNumOfAsyncClientPool()];
+    for (int i = 0; i < tManagers.length; i++) {
+      try {
+        tManagers[i] = new TAsyncClientManager();
+      } catch (IOException e) {
+        logger.error("Cannot create Async client factory", e);
+      }
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
new file mode 100644
index 0000000000..f628692152
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+
+public abstract class BaseClientFactory<K, T> implements 
KeyedPooledObjectFactory<K, T> {
+
+  protected ClientManager<T> clientManager;
+  protected ClientManagerProperty<T> clientManagerProperty;
+
+  public BaseClientFactory(
+      ClientManager<T> clientManager, ClientManagerProperty<T> 
clientManagerProperty) {
+    this.clientManager = clientManager;
+    this.clientManagerProperty = clientManagerProperty;
+  }
+
+  @Override
+  public void activateObject(K node, PooledObject<T> pooledObject) {}
+
+  @Override
+  public void passivateObject(K node, PooledObject<T> pooledObject) {}
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
new file mode 100644
index 0000000000..10ad2090c0
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class ClientManager<E> implements IClientManager<E> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClientManager.class);
+
+  private final KeyedObjectPool<EndPoint, E> pool;
+
+  public ClientManager(IClientPoolFactory<E> factory) {
+    pool = factory.createClientPool(this);
+  }
+
+  @Override
+  public Optional<E> borrowClient(EndPoint node) throws IOException {
+    Optional<E> client = Optional.empty();
+    try {
+      client = Optional.of(pool.borrowObject(node));
+    } catch (TTransportException e) {
+      // external needs to check transport related exception
+      throw new IOException(e);
+    } catch (IOException e) {
+      // external needs the IOException to check connection
+      throw e;
+    } catch (Exception e) {
+      // external doesn't care of other exceptions
+      logger.error(String.format("Borrow client from pool for node %s 
failed.", node), e);
+    }
+    return client;
+  }
+
+  @Override
+  public void returnClient(EndPoint node, E client) {
+    if (client != null && node != null) {
+      try {
+        pool.returnObject(node, client);
+      } catch (Exception e) {
+        logger.error(
+            String.format("Return client %s for node %s to pool failed.", 
client, node), e);
+      }
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
new file mode 100644
index 0000000000..b35f163257
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.time.Duration;
+
+public class ClientManagerProperty<T> {
+
+  private final GenericKeyedObjectPoolConfig<T> config;
+
+  // thrift client config
+  private final TProtocolFactory protocolFactory;
+  private int connectionTimeoutMs = 20_000;
+  private int selectorNumOfAsyncClientPool = 1;
+
+  public ClientManagerProperty(
+      GenericKeyedObjectPoolConfig<T> config,
+      TProtocolFactory protocolFactory,
+      int connectionTimeoutMs,
+      int selectorNumOfAsyncClientPool) {
+    this.config = config;
+    this.protocolFactory = protocolFactory;
+    this.connectionTimeoutMs = connectionTimeoutMs;
+    this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
+  }
+
+  public GenericKeyedObjectPoolConfig<T> getConfig() {
+    return config;
+  }
+
+  public TProtocolFactory getProtocolFactory() {
+    return protocolFactory;
+  }
+
+  public int getConnectionTimeoutMs() {
+    return connectionTimeoutMs;
+  }
+
+  public int getSelectorNumOfAsyncClientPool() {
+    return selectorNumOfAsyncClientPool;
+  }
+
+  public static class Builder<T> {
+
+    // pool config
+    private long waitClientTimeoutMS = 20_000;
+    private int maxConnectionForEachNode = 1_000;
+    private int maxIdleConnectionForEachNode = 1_000;
+
+    // thrift client config
+    private boolean rpcThriftCompressionEnabled = false;
+    private int connectionTimeoutMs = 20_000;
+    private int selectorNumOfAsyncClientPool = 1;
+
+    public Builder<T> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
+      this.waitClientTimeoutMS = waitClientTimeoutMS;
+      return this;
+    }
+
+    public Builder<T> setMaxConnectionForEachNode(int 
maxConnectionForEachNode) {
+      this.maxConnectionForEachNode = maxConnectionForEachNode;
+      return this;
+    }
+
+    public Builder<T> setMaxIdleConnectionForEachNode(int 
maxIdleConnectionForEachNode) {
+      this.maxIdleConnectionForEachNode = maxIdleConnectionForEachNode;
+      return this;
+    }
+
+    public Builder<T> setRpcThriftCompressionEnabled(boolean 
rpcThriftCompressionEnabled) {
+      this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+      return this;
+    }
+
+    public Builder<T> setConnectionTimeoutMs(int connectionTimeoutMs) {
+      this.connectionTimeoutMs = connectionTimeoutMs;
+      return this;
+    }
+
+    public Builder<T> setSelectorNumOfAsyncClientPool(int 
selectorNumOfAsyncClientPool) {
+      this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
+      return this;
+    }
+
+    public ClientManagerProperty<T> build() {
+      GenericKeyedObjectPoolConfig<T> poolConfig = new 
GenericKeyedObjectPoolConfig<>();
+      poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
+      poolConfig.setMaxIdlePerKey(maxIdleConnectionForEachNode);
+      poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
+      poolConfig.setTestOnReturn(true);
+      poolConfig.setTestOnBorrow(true);
+      return new ClientManagerProperty<>(
+          poolConfig,
+          rpcThriftCompressionEnabled
+              ? new TCompactProtocol.Factory()
+              : new TBinaryProtocol.Factory(),
+          connectionTimeoutMs,
+          selectorNumOfAsyncClientPool);
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
new file mode 100644
index 0000000000..11883aefda
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public interface IClientManager<E> {
+
+  Optional<E> borrowClient(EndPoint node) throws IOException;
+
+  void returnClient(EndPoint node, E client);
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
new file mode 100644
index 0000000000..80341bf5f5
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+
+public interface IClientPoolFactory<E> {
+
+  KeyedObjectPool<EndPoint, E> createClientPool(
+      ClientManager<E> manager);
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
new file mode 100644
index 0000000000..6119b799a8
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.io.IOException;
+
+/** We put the client in this module because it is used by both ConfigNode and 
Datanode */
+public class AsyncDataNodeInternalServiceClient extends 
InternalService.AsyncClient {
+
+  private final EndPoint endpoint;
+  private final ClientManager<AsyncDataNodeInternalServiceClient> 
clientManager;
+
+  public AsyncDataNodeInternalServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<AsyncDataNodeInternalServiceClient> clientManager)
+      throws IOException {
+    super(
+        protocolFactory,
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  public void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  public boolean isValid() {
+    return ___transport != null;
+  }
+
+  /**
+   * return self if clientPool is not null, the method doesn't need to call by 
user, it will trigger
+   * once client transport complete.
+   */
+  private void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  public boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public static class Factory
+      extends AsyncBaseClientFactory<EndPoint, 
AsyncDataNodeInternalServiceClient> {
+
+    public Factory(
+        ClientManager<AsyncDataNodeInternalServiceClient> clientManager,
+        ClientManagerProperty<AsyncDataNodeInternalServiceClient> 
clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncDataNodeInternalServiceClient> 
makeObject(EndPoint endPoint)
+        throws Exception {
+      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % 
tManagers.length];
+      tManager = tManager == null ? new TAsyncClientManager() : tManager;
+      return new DefaultPooledObject<>(
+          new AsyncDataNodeInternalServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endPoint,
+              tManager,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> 
pooledObject) {
+      return pooledObject.getObject() != null && 
pooledObject.getObject().isValid();
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
new file mode 100644
index 0000000000..a40104bbbc
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+/** We put the client in this module because it is used by both ConfigNode and 
Datanode */
+public class SyncDataNodeInternalServiceClient extends InternalService.Client {
+
+  private final EndPoint endpoint;
+  private final ClientManager<SyncDataNodeInternalServiceClient> clientManager;
+
+  public SyncDataNodeInternalServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      ClientManager<SyncDataNodeInternalServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endpoint.getIp(),
+                    endpoint.getPort(),
+                    connectionTimeout))));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) 
getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public static class Factory
+      extends BaseClientFactory<EndPoint, SyncDataNodeInternalServiceClient> {
+
+    public Factory(
+        ClientManager<SyncDataNodeInternalServiceClient> clientManager,
+        ClientManagerProperty<SyncDataNodeInternalServiceClient> 
clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<SyncDataNodeInternalServiceClient> makeObject(EndPoint 
endpoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new SyncDataNodeInternalServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> 
pooledObject) {
+      return pooledObject.getObject() != null
+          && 
pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java 
b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
new file mode 100644
index 0000000000..63f6c7a45c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
+import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.client.async.AsyncConfigNodeIServiceClient;
+import org.apache.iotdb.db.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+
+public class ClientPoolFactory {
+
+  private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
+
+  public static class SyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<SyncConfigNodeIServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, SyncConfigNodeIServiceClient> 
createClientPool(
+        ClientManager<SyncConfigNodeIServiceClient> manager) {
+      ClientManagerProperty<SyncConfigNodeIServiceClient> property =
+          new ClientManagerProperty.Builder<SyncConfigNodeIServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new SyncConfigNodeIServiceClient.Factory(manager, property), 
property.getConfig());
+    }
+  }
+
+  public static class AsyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<AsyncConfigNodeIServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, AsyncConfigNodeIServiceClient> 
createClientPool(
+        ClientManager<AsyncConfigNodeIServiceClient> manager) {
+      ClientManagerProperty<AsyncConfigNodeIServiceClient> property =
+          new ClientManagerProperty.Builder<AsyncConfigNodeIServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              
.setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new AsyncConfigNodeIServiceClient.Factory(manager, property), 
property.getConfig());
+    }
+  }
+
+  public static class SyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> 
createClientPool(
+        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
+          new 
ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new SyncDataNodeInternalServiceClient.Factory(manager, property), 
property.getConfig());
+    }
+  }
+
+  public static class AsyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> 
createClientPool(
+        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
+          new 
ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              
.setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new AsyncDataNodeInternalServiceClient.Factory(manager, property), 
property.getConfig());
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
 
b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
new file mode 100644
index 0000000000..00ac180429
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.io.IOException;
+
+public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
+
+  private final EndPoint endpoint;
+  private final ClientManager<AsyncConfigNodeIServiceClient> clientManager;
+
+  public AsyncConfigNodeIServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<AsyncConfigNodeIServiceClient> clientManager)
+      throws IOException {
+    super(
+        protocolFactory,
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  public void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  public boolean isValid() {
+    return ___transport != null;
+  }
+
+  /**
+   * return self if clientPool is not null, the method doesn't need to call by 
user, it will trigger
+   * once client transport complete.
+   */
+  private void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  public boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public static class Factory
+      extends AsyncBaseClientFactory<EndPoint, AsyncConfigNodeIServiceClient> {
+
+    public Factory(
+        ClientManager<AsyncConfigNodeIServiceClient> clientManager,
+        ClientManagerProperty<AsyncConfigNodeIServiceClient> 
clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncConfigNodeIServiceClient> makeObject(EndPoint 
endPoint)
+        throws Exception {
+      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % 
tManagers.length];
+      tManager = tManager == null ? new TAsyncClientManager() : tManager;
+      return new DefaultPooledObject<>(
+          new AsyncConfigNodeIServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endPoint,
+              tManager,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> 
pooledObject) {
+      return pooledObject.getObject() != null && 
pooledObject.getObject().isValid();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
 
b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
new file mode 100644
index 0000000000..1701083464
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.io.IOException;
+
+public class AsyncDataNodeDataBlockServiceClient extends 
DataBlockService.AsyncClient {
+
+  private final EndPoint endpoint;
+  private final ClientManager<AsyncDataNodeDataBlockServiceClient> 
clientManager;
+
+  public AsyncDataNodeDataBlockServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager)
+      throws IOException {
+    super(
+        protocolFactory,
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), 
connectionTimeout));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  public void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  public boolean isValid() {
+    return ___transport != null;
+  }
+
+  /**
+   * return self if clientPool is not null, the method doesn't need to call by 
user, it will trigger
+   * once client transport complete.
+   */
+  private void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  public boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public static class Factory
+      extends AsyncBaseClientFactory<EndPoint, 
AsyncDataNodeDataBlockServiceClient> {
+
+    public Factory(
+        ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManagerProperty<AsyncDataNodeDataBlockServiceClient> 
clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeDataBlockServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncDataNodeDataBlockServiceClient> 
makeObject(EndPoint endPoint)
+        throws Exception {
+      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % 
tManagers.length];
+      tManager = tManager == null ? new TAsyncClientManager() : tManager;
+      return new DefaultPooledObject<>(
+          new AsyncDataNodeDataBlockServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endPoint,
+              tManager,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeDataBlockServiceClient> 
pooledObject) {
+      return pooledObject.getObject() != null && 
pooledObject.getObject().isValid();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
 
b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
new file mode 100644
index 0000000000..60e01bfc0c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
+
+  private final EndPoint endpoint;
+  private final ClientManager<SyncConfigNodeIServiceClient> clientManager;
+
+  public SyncConfigNodeIServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      ClientManager<SyncConfigNodeIServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endpoint.getIp(),
+                    endpoint.getPort(),
+                    connectionTimeout))));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) 
getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public static class Factory extends BaseClientFactory<EndPoint, 
SyncConfigNodeIServiceClient> {
+
+    public Factory(
+        ClientManager<SyncConfigNodeIServiceClient> clientManager,
+        ClientManagerProperty<SyncConfigNodeIServiceClient> 
clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<SyncConfigNodeIServiceClient> makeObject(EndPoint 
endpoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new SyncConfigNodeIServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> 
pooledObject) {
+      return pooledObject.getObject() != null
+          && 
pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
 
b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
new file mode 100644
index 0000000000..f6188709a9
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+public class SyncDataNodeDataBlockServiceClient extends 
DataBlockService.Client {
+
+  private final EndPoint endpoint;
+  private final ClientManager<SyncDataNodeDataBlockServiceClient> 
clientManager;
+
+  public SyncDataNodeDataBlockServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      ClientManager<SyncDataNodeDataBlockServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endpoint.getIp(),
+                    endpoint.getPort(),
+                    connectionTimeout))));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) 
(getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) 
getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public static class Factory
+      extends BaseClientFactory<EndPoint, SyncDataNodeDataBlockServiceClient> {
+
+    public Factory(
+        ClientManager<SyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManagerProperty<SyncDataNodeDataBlockServiceClient> 
clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient> 
pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<SyncDataNodeDataBlockServiceClient> 
makeObject(EndPoint endpoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new SyncDataNodeDataBlockServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient> 
pooledObject) {
+      return pooledObject.getObject() != null
+          && 
pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c702a9dba6..e9ee063e4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -875,6 +875,18 @@ public class IoTDBConfig {
   /** Thread keep alive time in ms of data block manager. */
   private int dataBlockManagerKeepAliveTimeInMs = 1000;
 
+  /** Thrift socket and connection timeout between data node and config node */
+  private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
+
+  /**
+   * ClientPool will have so many selector threads (TAsyncClientManager) to 
distribute to its
+   * clients.
+   */
+  private final int selectorNumOfClientPool =
+      Runtime.getRuntime().availableProcessors() / 4 > 0
+          ? Runtime.getRuntime().availableProcessors() / 4
+          : 1;
+
   public float getUdfMemoryBudgetInMB() {
     return udfMemoryBudgetInMB;
   }
@@ -2769,6 +2781,18 @@ public class IoTDBConfig {
     this.dataBlockManagerKeepAliveTimeInMs = dataBlockManagerKeepAliveTimeInMs;
   }
 
+  public int getConnectionTimeoutInMS() {
+    return connectionTimeoutInMS;
+  }
+
+  public void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
+    this.connectionTimeoutInMS = connectionTimeoutInMS;
+  }
+
+  public int getSelectorNumOfClientPool() {
+    return selectorNumOfClientPool;
+  }
+
   public boolean isMppMode() {
     return mppMode;
   }

Reply via email to