This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 085928af01 [IOTDB-4039] Distinguish the selector thread-name of
ClientPools (#6927)
085928af01 is described below
commit 085928af01844adac76654577ff9e225122d4e01
Author: 任宇华 <[email protected]>
AuthorDate: Wed Aug 10 19:47:53 2022 +0800
[IOTDB-4039] Distinguish the selector thread-name of ClientPools (#6927)
---
.../client/AsyncMultiLeaderServiceClient.java | 5 ++--
.../client/MultiLeaderConsensusClientPool.java | 5 +++-
.../commons/client/AsyncBaseClientFactory.java | 24 +++++++++++++------
.../iotdb/commons/client/ClientPoolFactory.java | 15 +++++++++---
.../AsyncConfigNodeHeartbeatServiceClient.java | 5 ++--
.../async/AsyncConfigNodeIServiceClient.java | 5 ++--
.../async/AsyncDataNodeHeartbeatServiceClient.java | 5 ++--
.../async/AsyncDataNodeInternalServiceClient.java | 5 ++--
.../AsyncDataNodeMPPDataExchangeServiceClient.java | 5 ++--
.../iotdb/commons/client/ClientManagerTest.java | 4 +++-
.../iotdb/db/client/DataNodeClientPoolFactory.java | 28 +++++++---------------
11 files changed, 62 insertions(+), 44 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
index 98e2e82b18..e4a9f4e3c4 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/AsyncMultiLeaderServiceClient.java
@@ -112,8 +112,9 @@ public class AsyncMultiLeaderServiceClient extends
MultiLeaderConsensusIService.
public Factory(
ClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ClientFactoryProperty clientFactoryProperty,
+ String threadName) {
+ super(clientManager, clientFactoryProperty, threadName);
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
index e3687edf8f..44d1002943 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/MultiLeaderConsensusClientPool.java
@@ -37,6 +37,8 @@ public class MultiLeaderConsensusClientPool {
implements IClientPoolFactory<TEndPoint, AsyncMultiLeaderServiceClient> {
private final MultiLeaderConfig config;
+ private static final String MULTI_LEADER_CONSENSUS_CLIENT_POOL_THREAD_NAME
=
+ "MultiLeaderConsensusClientPool";
public AsyncMultiLeaderServiceClientPoolFactory(MultiLeaderConfig config) {
this.config = config;
@@ -53,7 +55,8 @@ public class MultiLeaderConsensusClientPool {
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
config.getRpc().getSelectorNumOfClientManager())
- .build()),
+ .build(),
+ MULTI_LEADER_CONSENSUS_CLIENT_POOL_THREAD_NAME),
new
ClientPoolProperty.Builder<AsyncMultiLeaderServiceClient>().build().getConfig());
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
index b6cb485a9c..73fd0dca22 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
@@ -25,23 +25,33 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public abstract class AsyncBaseClientFactory<K, V> extends
BaseClientFactory<K, V> {
private static final Logger logger =
LoggerFactory.getLogger(AsyncBaseClientFactory.class);
protected TAsyncClientManager[] tManagers;
protected AtomicInteger clientCnt = new AtomicInteger();
+ private static final String THRIFT_THREAD_NAME =
"TAsyncClientManager#SelectorThread";
protected AsyncBaseClientFactory(
- ClientManager<K, V> clientManager, ClientFactoryProperty
clientFactoryProperty) {
+ ClientManager<K, V> clientManager,
+ ClientFactoryProperty clientFactoryProperty,
+ String threadName) {
super(clientManager, clientFactoryProperty);
- tManagers = new
TAsyncClientManager[clientFactoryProperty.getSelectorNumOfAsyncClientPool()];
- for (int i = 0; i < tManagers.length; i++) {
- try {
- tManagers[i] = new TAsyncClientManager();
- } catch (IOException e) {
- logger.error("Cannot create Async client factory", e);
+ synchronized (this) {
+ tManagers = new
TAsyncClientManager[clientFactoryProperty.getSelectorNumOfAsyncClientPool()];
+ for (int i = 0; i < tManagers.length; i++) {
+ try {
+ tManagers[i] = new TAsyncClientManager();
+ } catch (IOException e) {
+ logger.error("Cannot create Async client factory", e);
+ }
}
+ Thread.getAllStackTraces().keySet().stream()
+ .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
+ .collect(Collectors.toList())
+ .forEach(thread -> thread.setName(threadName + "-selector" + "-" +
thread.getId()));
}
}
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 66f9428864..1b64be7657 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -33,6 +33,12 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
public class ClientPoolFactory {
private static final CommonConfig conf =
CommonDescriptor.getInstance().getConfig();
+ private static final String DATA_NODE_CLIENT_POOL_THREAD_NAME =
+ "AsyncDataNodeInternalServiceClientPool";
+ private static final String CONFIG_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME =
+ "AsyncConfigNodeHeartbeatServiceClientPool";
+ private static final String DATA_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME =
+ "AsyncDataNodeHeartbeatServiceClientPool";
private ClientPoolFactory() {}
@@ -65,7 +71,8 @@ public class ClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
+ .build(),
+ DATA_NODE_CLIENT_POOL_THREAD_NAME),
new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
}
}
@@ -82,7 +89,8 @@ public class ClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
+ .build(),
+ CONFIG_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME),
new
ClientPoolProperty.Builder<AsyncConfigNodeHeartbeatServiceClient>()
.build()
.getConfig());
@@ -101,7 +109,8 @@ public class ClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
+ .build(),
+ DATA_NODE_HEARTBEAT_CLIENT_POOL_THREAD_NAME),
new ClientPoolProperty.Builder<AsyncDataNodeHeartbeatServiceClient>()
.build()
.getConfig());
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
index 65fa38a873..38b12d2370 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
@@ -107,8 +107,9 @@ public class AsyncConfigNodeHeartbeatServiceClient extends
IConfigNodeRPCService
public Factory(
ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient>
clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ClientFactoryProperty clientFactoryProperty,
+ String threadName) {
+ super(clientManager, clientFactoryProperty, threadName);
}
@Override
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 5b51d845c6..32219dcbfb 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -112,8 +112,9 @@ public class AsyncConfigNodeIServiceClient extends
IConfigNodeRPCService.AsyncCl
public Factory(
ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ClientFactoryProperty clientFactoryProperty,
+ String threadName) {
+ super(clientManager, clientFactoryProperty, threadName);
}
@Override
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
index 41054e165d..2812886336 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
@@ -107,8 +107,9 @@ public class AsyncDataNodeHeartbeatServiceClient extends
IDataNodeRPCService.Asy
public Factory(
ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient>
clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ClientFactoryProperty clientFactoryProperty,
+ String threadName) {
+ super(clientManager, clientFactoryProperty, threadName);
}
@Override
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 3a4fe1f68e..1cb0529971 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -124,8 +124,9 @@ public class AsyncDataNodeInternalServiceClient extends
IDataNodeRPCService.Asyn
public Factory(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ClientFactoryProperty clientFactoryProperty,
+ String threadName) {
+ super(clientManager, clientFactoryProperty, threadName);
}
@Override
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 01623e6b81..667f3189b1 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -113,8 +113,9 @@ public class AsyncDataNodeMPPDataExchangeServiceClient
extends MPPDataExchangeSe
public Factory(
ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient>
clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ClientFactoryProperty clientFactoryProperty,
+ String threadName) {
+ super(clientManager, clientFactoryProperty, threadName);
}
@Override
diff --git
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index d0bb75ae50..05169af548 100644
---
a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++
b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -447,7 +447,9 @@ public class ClientManagerTest {
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
- manager, new ClientFactoryProperty.Builder().build()),
+ manager,
+ new ClientFactoryProperty.Builder().build(),
+ "AsyncDataNodeInternalServiceClientPool"),
new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
index f47fba59d5..b405101b2c 100644
---
a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
+++
b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -40,6 +39,10 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
public class DataNodeClientPoolFactory {
private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
+ private static final String CONFIG_NODE_CLIENT_POOL_THREAD_NAME =
+ "AsyncConfigNodeIServiceClientPool";
+ private static final String
DATA_NODE_MPP_DATA_EXCHANGE_CLIENT_POOL_THREAD_NAME =
+ "AsyncDataNodeMPPDataExchangeServiceClientPool";
private DataNodeClientPoolFactory() {}
@@ -72,7 +75,8 @@ public class DataNodeClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
+ .build(),
+ CONFIG_NODE_CLIENT_POOL_THREAD_NAME),
new
ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>().build().getConfig());
}
}
@@ -94,23 +98,6 @@ public class DataNodeClientPoolFactory {
}
}
- public static class AsyncDataNodeInternalServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint,
AsyncDataNodeInternalServiceClient> {
- @Override
- public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient>
createClientPool(
- ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
- return new GenericKeyedObjectPool<>(
- new AsyncDataNodeInternalServiceClient.Factory(
- manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
- new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
- }
- }
-
public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint,
SyncDataNodeMPPDataExchangeServiceClient> {
@Override
@@ -142,7 +129,8 @@ public class DataNodeClientPoolFactory {
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
+ .build(),
+ DATA_NODE_MPP_DATA_EXCHANGE_CLIENT_POOL_THREAD_NAME),
new
ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
.build()
.getConfig());