This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 085928af01 [IOTDB-4039] Distinguish the selector thread-name of 
ClientPools (#6927)
085928af01 is described below

commit 085928af01844adac76654577ff9e225122d4e01
Author: 任宇华 <[email protected]>
AuthorDate: Wed Aug 10 19:47:53 2022 +0800

    [IOTDB-4039] Distinguish the selector thread-name of ClientPools (#6927)
---
 .../client/AsyncMultiLeaderServiceClient.java      |  5 ++--
 .../client/MultiLeaderConsensusClientPool.java     |  5 +++-
 .../commons/client/AsyncBaseClientFactory.java     | 24 +++++++++++++------
 .../iotdb/commons/client/ClientPoolFactory.java    | 15 +++++++++---
 .../AsyncConfigNodeHeartbeatServiceClient.java     |  5 ++--
 .../async/AsyncConfigNodeIServiceClient.java       |  5 ++--
 .../async/AsyncDataNodeHeartbeatServiceClient.java |  5 ++--
 .../async/AsyncDataNodeInternalServiceClient.java  |  5 ++--
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |  5 ++--
 .../iotdb/commons/client/ClientManagerTest.java    |  4 +++-
 .../iotdb/db/client/DataNodeClientPoolFactory.java | 28 +++++++---------------
 11 files changed, 62 insertions(+), 44 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
index 98e2e82b18..e4a9f4e3c4 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
@@ -112,8 +112,9 @@ public class AsyncMultiLeaderServiceClient extends 
MultiLeaderConsensusIService.
 
     public Factory(
         ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientFactoryProperty clientFactoryProperty,
+        String threadName) {
+      super(clientManager, clientFactoryProperty, threadName);
     }
 
     @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
index e3687edf8f..44d1002943 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
@@ -37,6 +37,8 @@ public class MultiLeaderConsensusClientPool {
       implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
 
     private final MultiLeaderConfig config;
+    private static final String MULTI_LEADER_CONSENSUS_CLIENT_POOL_THREAD_NAME 
=
+        "MultiLeaderConsensusClientPool";
 
     public AsyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
       this.config = config;
@@ -53,7 +55,8 @@ public class MultiLeaderConsensusClientPool {
                   
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
                   .setSelectorNumOfAsyncClientManager(
                       config.getRpc().getSelectorNumOfClientManager())
-                  .build()),
+                  .build(),
+              MULTI_LEADER_CONSENSUS_CLIENT_POOL_THREAD_NAME),
           new 
ClientPoolProperty.Builder<AsyncMultiLeaderServiceClient>().build().getConfig());
     }
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
index b6cb485a9c..73fd0dca22 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
@@ -25,23 +25,33 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public abstract class AsyncBaseClientFactory<K, V> extends 
BaseClientFactory<K, V> {
 
   private static final Logger logger = 
LoggerFactory.getLogger(AsyncBaseClientFactory.class);
   protected TAsyncClientManager[] tManagers;
   protected AtomicInteger clientCnt = new AtomicInteger();
+  private static final String THRIFT_THREAD_NAME = 
"TAsyncClientManager#SelectorThread";
 
   protected AsyncBaseClientFactory(
-      ClientManager<K, V> clientManager, ClientFactoryProperty 
clientFactoryProperty) {
+      ClientManager<K, V> clientManager,
+      ClientFactoryProperty clientFactoryProperty,
+      String threadName) {
     super(clientManager, clientFactoryProperty);
-    tManagers = new 
TAsyncClientManager[clientFactoryProperty.getSelectorNumOfAsyncClientPool()];
-    for (int i = 0; i < tManagers.length; i++) {
-      try {
-        tManagers[i] = new TAsyncClientManager();
-      } catch (IOException e) {
-        logger.error("Cannot create Async client factory", e);
+    synchronized (this) {
+      tManagers = new 
TAsyncClientManager[clientFactoryProperty.getSelectorNumOfAsyncClientPool()];
+      for (int i = 0; i < tManagers.length; i++) {
+        try {
+          tManagers[i] = new TAsyncClientManager();
+        } catch (IOException e) {
+          logger.error("Cannot create Async client factory", e);
+        }
       }
+      Thread.getAllStackTraces().keySet().stream()
+          .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
+          .collect(Collectors.toList())
+          .forEach(thread -> thread.setName(threadName + "-selector" + "-" + 
thread.getId()));
     }
   }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 66f9428864..1b64be7657 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -33,6 +33,12 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 public class ClientPoolFactory {
 
   private static final CommonConfig conf = 
CommonDescriptor.getInstance().getConfig();
+  private static final String DATA_NODE_CLIENT_POOL_THREAD_NAME =
+      "AsyncDataNodeInternalServiceClientPool";
+  private static final String CONFIG_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME =
+      "AsyncConfigNodeHeartbeatServiceClientPool";
+  private static final String DATA_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME =
+      "AsyncDataNodeHeartbeatServiceClientPool";
 
   private ClientPoolFactory() {}
 
@@ -65,7 +71,8 @@ public class ClientPoolFactory {
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                   
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                   
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
+                  .build(),
+              DATA_NODE_CLIENT_POOL_THREAD_NAME),
           new 
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
     }
   }
@@ -82,7 +89,8 @@ public class ClientPoolFactory {
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                   
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                   
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
+                  .build(),
+              CONFIG_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME),
           new 
ClientPoolProperty.Builder<AsyncConfigNodeHeartbeatServiceClient>()
               .build()
               .getConfig());
@@ -101,7 +109,8 @@ public class ClientPoolFactory {
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                   
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                   
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
+                  .build(),
+              DATA_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME),
           new ClientPoolProperty.Builder<AsyncDataNodeHeartbeatServiceClient>()
               .build()
               .getConfig());
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
index 65fa38a873..38b12d2370 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
@@ -107,8 +107,9 @@ public class AsyncConfigNodeHeartbeatServiceClient extends 
IConfigNodeRPCService
 
     public Factory(
         ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> 
clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientFactoryProperty clientFactoryProperty,
+        String threadName) {
+      super(clientManager, clientFactoryProperty, threadName);
     }
 
     @Override
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 5b51d845c6..32219dcbfb 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -112,8 +112,9 @@ public class AsyncConfigNodeIServiceClient extends 
IConfigNodeRPCService.AsyncCl
 
     public Factory(
         ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientFactoryProperty clientFactoryProperty,
+        String threadName) {
+      super(clientManager, clientFactoryProperty, threadName);
     }
 
     @Override
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
index 41054e165d..2812886336 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
@@ -107,8 +107,9 @@ public class AsyncDataNodeHeartbeatServiceClient extends 
IDataNodeRPCService.Asy
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> 
clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientFactoryProperty clientFactoryProperty,
+        String threadName) {
+      super(clientManager, clientFactoryProperty, threadName);
     }
 
     @Override
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 3a4fe1f68e..1cb0529971 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -124,8 +124,9 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> 
clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientFactoryProperty clientFactoryProperty,
+        String threadName) {
+      super(clientManager, clientFactoryProperty, threadName);
     }
 
     @Override
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 01623e6b81..667f3189b1 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -113,8 +113,9 @@ public class AsyncDataNodeMPPDataExchangeServiceClient 
extends MPPDataExchangeSe
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> 
clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientFactoryProperty clientFactoryProperty,
+        String threadName) {
+      super(clientManager, clientFactoryProperty, threadName);
     }
 
     @Override
diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
 
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index d0bb75ae50..05169af548 100644
--- 
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ 
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -447,7 +447,9 @@ public class ClientManagerTest {
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeInternalServiceClient.Factory(
-              manager, new ClientFactoryProperty.Builder().build()),
+              manager,
+              new ClientFactoryProperty.Builder().build(),
+              "AsyncDataNodeInternalServiceClientPool"),
           new 
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
index f47fba59d5..b405101b2c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import 
org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -40,6 +39,10 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 public class DataNodeClientPoolFactory {
 
   private static final IoTDBConfig conf = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final String CONFIG_NODE_CLIENT_POOL_THREAD_NAME =
+      "AsyncConfigNodeIServiceClientPool";
+  private static final String 
DATA_NODE_MPP_DATA_EXCHANGE_CLIENT_POOL_THREAD_NAME =
+      "AsyncDataNodeMPPDataExchangeServiceClientPool";
 
   private DataNodeClientPoolFactory() {}
 
@@ -72,7 +75,8 @@ public class DataNodeClientPoolFactory {
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                   
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
                   
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
+                  .build(),
+              CONFIG_NODE_CLIENT_POOL_THREAD_NAME),
           new 
ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>().build().getConfig());
     }
   }
@@ -94,23 +98,6 @@ public class DataNodeClientPoolFactory {
     }
   }
 
-  public static class AsyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, 
AsyncDataNodeInternalServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> 
createClientPool(
-        ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new AsyncDataNodeInternalServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
-          new 
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
-    }
-  }
-
   public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, 
SyncDataNodeMPPDataExchangeServiceClient> {
     @Override
@@ -142,7 +129,8 @@ public class DataNodeClientPoolFactory {
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                   
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
                   
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
+                  .build(),
+              DATA_NODE_MPP_DATA_EXCHANGE_CLIENT_POOL_THREAD_NAME),
           new 
ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
               .build()
               .getConfig());

Reply via email to