This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira5425 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 93230e34d9e7988b52c9f7803d682b727f222579 Author: Potato <[email protected]> AuthorDate: Sat Jan 28 20:35:39 2023 +0800 [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891) --- .../apache/iotdb/commons/client/ClientManager.java | 13 ++++----- .../apache/iotdb/db/client/ConfigNodeClient.java | 33 ++++++---------------- .../java/org/apache/iotdb/db/service/DataNode.java | 28 +++++++++++------- .../db/service/DataNodeServerCommandLine.java | 17 +++++++---- .../iotdb/db/service/RegionMigrateService.java | 14 +++++++-- 5 files changed, 55 insertions(+), 50 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 770c729bbe..3ee338d4ec 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -57,9 +57,10 @@ public class ClientManager<K, V> implements IClientManager<K, V> { } /** - * return a client V for node K to the ClientManager Note: We do not define this interface in - * IClientManager to make you aware that the return of a client is automatic whenever a particular - * client is used. + * return a client V for node K to the ClientManager. + * + * <p>Note: We do not define this interface in IClientManager to make you aware that the return of + * a client is automatic whenever a particular client is used. */ public void returnClient(K node, V client) { Optional.ofNullable(node) @@ -89,10 +90,6 @@ public class ClientManager<K, V> implements IClientManager<K, V> { @Override public void close() { - try { - pool.close(); - } catch (Exception e) { - logger.warn("Close client pool failed", e); - } + pool.close(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java index 702818e2ef..ffd25ad9c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java +++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java @@ -27,10 +27,8 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq; import org.apache.iotdb.commons.client.ClientManager; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.ThriftClientFactory; -import org.apache.iotdb.commons.client.property.ClientPoolProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConfigNodeRegionId; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq; @@ -118,8 +116,6 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -141,7 +137,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie private static final int RETRY_INTERVAL_MS = 1000; - private long connectionTimeout = ClientPoolProperty.DefaultProperty.WAIT_CLIENT_TIMEOUT_MS; + private final long connectionTimeout; private IConfigNodeRPCService.Iface client; @@ -163,23 +159,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie TProtocolFactory protocolFactory; - public ConfigNodeClient() throws TException { - // Read config nodes from configuration - configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes(); - protocolFactory = - CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled() - ? new TCompactProtocol.Factory() - : new TBinaryProtocol.Factory(); - - init(); - } - public ConfigNodeClient( + List<TEndPoint> configNodes, TProtocolFactory protocolFactory, long connectionTimeout, ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager) throws TException { - configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes(); + this.configNodes = configNodes; this.protocolFactory = protocolFactory; this.connectionTimeout = connectionTimeout; this.clientManager = clientManager; @@ -274,11 +260,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie @Override public void close() { - if (clientManager != null) { - clientManager.returnClient(configNodeRegionId, this); - } else { - invalidate(); - } + clientManager.returnClient(configNodeRegionId, this); } @Override @@ -1859,7 +1841,8 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie SyncThriftClientWithErrorHandler.newErrorHandler( ConfigNodeClient.class, ConfigNodeClient.class.getConstructor( - TProtocolFactory.class, long.class, clientManager.getClass()), + List.class, TProtocolFactory.class, long.class, clientManager.getClass()), + ConfigNodeInfo.getInstance().getLatestConfigNodes(), thriftClientProperty.getProtocolFactory(), thriftClientProperty.getConnectionTimeoutMs(), clientManager)); @@ -1868,7 +1851,9 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie @Override public boolean validateObject( ConfigNodeRegionId configNodeRegionId, PooledObject<ConfigNodeClient> pooledObject) { - return pooledObject.getObject() != null && pooledObject.getObject().getTransport().isOpen(); + return Optional.ofNullable(pooledObject.getObject().getTransport()) + .map(TTransport::isOpen) + .orElse(false); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 3c5f1e49dd..4512b4ea05 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeResource; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; @@ -51,6 +52,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration; import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.client.ConfigNodeClient; +import org.apache.iotdb.db.client.ConfigNodeClientManager; import org.apache.iotdb.db.client.ConfigNodeInfo; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -100,6 +102,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME; public class DataNode implements DataNodeMBean { + private static final Logger logger = LoggerFactory.getLogger(DataNode.class); private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -232,10 +235,11 @@ public class DataNode implements DataNodeMBean { int retry = DEFAULT_RETRY; TSystemConfigurationResp configurationResp = null; while (retry > 0) { - try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { + try (ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { configurationResp = configNodeClient.getSystemConfiguration(); break; - } catch (TException e) { + } catch (TException | ClientManagerException e) { // Read ConfigNodes from system.properties and retry logger.warn( "Cannot pull system configurations from ConfigNode-leader, because: {}", @@ -344,10 +348,11 @@ public class DataNode implements DataNodeMBean { req.setClusterName(config.getClusterName()); TDataNodeRegisterResp dataNodeRegisterResp = null; while (retry > 0) { - try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { + try (ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { dataNodeRegisterResp = configNodeClient.registerDataNode(req); break; - } catch (TException e) { + } catch (TException | ClientManagerException e) { // Read ConfigNodes from system.properties and retry logger.warn("Cannot register to the cluster, because: {}", e.getMessage()); ConfigNodeInfo.getInstance().loadConfigNodeList(); @@ -403,10 +408,11 @@ public class DataNode implements DataNodeMBean { req.setDataNodeConfiguration(generateDataNodeConfiguration()); TDataNodeRestartResp dataNodeRestartResp = null; while (retry > 0) { - try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { + try (ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { dataNodeRestartResp = configNodeClient.restartDataNode(req); break; - } catch (TException e) { + } catch (TException | ClientManagerException e) { // Read ConfigNodes from system.properties and retry logger.warn( "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage()); @@ -652,7 +658,8 @@ public class DataNode implements DataNodeMBean { } private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException { - try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { + try (ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { List<String> jarNameList = udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList()); TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList)); @@ -664,7 +671,7 @@ public class DataNode implements DataNodeMBean { UDFExecutableManager.getInstance() .saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName()); } - } catch (IOException | TException e) { + } catch (IOException | TException | ClientManagerException e) { throw new StartupException(e); } } @@ -761,7 +768,8 @@ public class DataNode implements DataNodeMBean { private void getJarOfTriggers(List<TriggerInformation> triggerInformationList) throws StartupException { - try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { + try (ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { List<String> jarNameList = triggerInformationList.stream() .map(TriggerInformation::getJarName) @@ -775,7 +783,7 @@ public class DataNode implements DataNodeMBean { TriggerExecutableManager.getInstance() .saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName()); } - } catch (IOException | TException e) { + } catch (IOException | TException | ClientManagerException e) { throw new StartupException(e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java index 2b254b2296..a1a6d1621a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java @@ -22,12 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.ServerCommandLine; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp; import org.apache.iotdb.db.client.ConfigNodeClient; +import org.apache.iotdb.db.client.ConfigNodeClientManager; import org.apache.iotdb.db.client.ConfigNodeInfo; import org.apache.iotdb.rpc.TSStatusCode; @@ -91,7 +93,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine { * @param args id or ip:rpc_port for removed datanode */ private void doRemoveDataNode(String[] args) - throws BadNodeUrlException, TException, IoTDBException { + throws BadNodeUrlException, TException, IoTDBException, ClientManagerException { if (args.length != 2) { LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>"); @@ -109,7 +111,8 @@ public class DataNodeServerCommandLine extends ServerCommandLine { } LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations); TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations); - try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) { + try (ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq); LOGGER.info("Remove result {} ", removeResp); if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -145,17 +148,19 @@ public class DataNodeServerCommandLine extends ServerCommandLine { // Below supports multiple datanode deletion, split by ',', and is reserved for extension try { List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args); - try (ConfigNodeClient client = new ConfigNodeClient()) { + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { dataNodeLocations = client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream() .map(TDataNodeConfiguration::getLocation) .filter(location -> endPoints.contains(location.getClientRpcEndPoint())) .collect(Collectors.toList()); - } catch (TException e) { + } catch (TException | ClientManagerException e) { LOGGER.error("Get data node locations failed", e); } } catch (BadNodeUrlException e) { - try (ConfigNodeClient client = new ConfigNodeClient()) { + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { for (String id : args.split(",")) { if (!isNumeric(id)) { LOGGER.warn("Incorrect id format {}, skipped...", id); @@ -174,7 +179,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine { dataNodeLocations.add(nodeLocationResult.get(0)); } } - } catch (TException e1) { + } catch (TException | ClientManagerException e1) { LOGGER.error("Get data node locations failed", e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index 81d404bc1b..3667219739 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; @@ -34,6 +35,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.db.client.ConfigNodeClient; +import org.apache.iotdb.db.client.ConfigNodeClientManager; +import org.apache.iotdb.db.client.ConfigNodeInfo; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.engine.StorageEngineV2; @@ -50,6 +53,7 @@ import java.util.HashMap; import java.util.Map; public class RegionMigrateService implements IService { + private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class); public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]"; @@ -153,6 +157,7 @@ public class RegionMigrateService implements IService { } private static class RegionMigratePool extends AbstractPoolManager { + private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class); private RegionMigratePool() { @@ -178,6 +183,7 @@ public class RegionMigrateService implements IService { } private static class AddRegionPeerTask implements Runnable { + private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class); // The RegionGroup that shall perform the add peer process @@ -266,6 +272,7 @@ public class RegionMigrateService implements IService { } private static class RemoveRegionPeerTask implements Runnable { + private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class); private final TConsensusGroupId tRegionId; @@ -354,6 +361,7 @@ public class RegionMigrateService implements IService { } private static class DeleteOldRegionPeerTask implements Runnable { + private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class); private final TConsensusGroupId tRegionId; @@ -453,6 +461,7 @@ public class RegionMigrateService implements IService { } private static class Holder { + private static final RegionMigrateService INSTANCE = new RegionMigrateService(); private Holder() {} @@ -497,9 +506,10 @@ public class RegionMigrateService implements IService { } private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req) - throws TException { + throws TException, ClientManagerException { TSStatus status; - try (ConfigNodeClient client = new ConfigNodeClient()) { + try (ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) { status = client.reportRegionMigrateResult(req); LOGGER.info( "{}, Report region {} migrate result {} to Config node succeed, result: {}",
