This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch fix-rm-ain in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fe9019fa938dffd470400ad2e6346d43b0e2e780 Author: Yongzao <[email protected]> AuthorDate: Tue Jan 27 16:07:29 2026 +0800 finish --- .../client/sync/CnToAnSyncRequestType.java | 25 ++++ .../client/sync/SyncAINodeClientPool.java | 151 ++++++++++++++++++++ .../procedure/impl/node/RemoveAINodeProcedure.java | 22 ++- .../iotdb/commons/client/ClientPoolFactory.java | 22 +++ .../commons/client/sync/SyncAINodeClient.java | 153 +++++++++++++++++++++ 5 files changed, 360 insertions(+), 13 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java new file mode 100644 index 00000000000..9636d2ecf5f --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToAnSyncRequestType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client.sync; + +public enum CnToAnSyncRequestType { + // Node Maintenance + STOP_AI_NODE, +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java new file mode 100644 index 00000000000..5ef9df1c997 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncAINodeClientPool.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client.sync; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.sync.SyncAINodeClient; +import org.apache.iotdb.commons.exception.UncheckedStartupException; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.collect.ImmutableMap; +import org.apache.ratis.util.function.CheckedBiFunction; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class SyncAINodeClientPool { + + private static final Logger LOGGER = LoggerFactory.getLogger(SyncAINodeClientPool.class); + + private static final int DEFAULT_RETRY_NUM = 10; + + private final IClientManager<TEndPoint, SyncAINodeClient> clientManager; + + protected ImmutableMap< + CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient, Object, Exception>> + actionMap; + + private SyncAINodeClientPool() { + clientManager = + new IClientManager.Factory<TEndPoint, SyncAINodeClient>() + .createClientManager(new ClientPoolFactory.SyncAINodeClientPoolFactory()); + buildActionMap(); + checkActionMapCompleteness(); + } + + private void buildActionMap() { + ImmutableMap.Builder< + CnToAnSyncRequestType, CheckedBiFunction<Object, SyncAINodeClient, Object, Exception>> + actionMapBuilder = ImmutableMap.builder(); + actionMapBuilder.put(CnToAnSyncRequestType.STOP_AI_NODE, (req, client) -> client.stopAINode()); + actionMap = actionMapBuilder.build(); + } + + private void checkActionMapCompleteness() { + List<CnToAnSyncRequestType> lackList = + Arrays.stream(CnToAnSyncRequestType.values()) + .filter(type -> !actionMap.containsKey(type)) + .collect(Collectors.toList()); + if (!lackList.isEmpty()) { + throw new UncheckedStartupException( + String.format("These request types should be added to actionMap: %s", lackList)); + } + } + + public Object sendSyncRequestToAINodeWithRetry( + TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType) { + Throwable lastException = new TException(); + for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) { + try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) { + return executeSyncRequest(requestType, client, req); + } catch (Exception e) { + lastException = e; + if (retry != DEFAULT_RETRY_NUM - 1) { + LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, endPoint, retry + 1); + doRetryWait(retry); + } + } + } + LOGGER.error("{} failed on AINode {}", requestType, endPoint, lastException); + return new TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) + .setMessage("All retry failed due to: " + lastException.getMessage()); + } + + public Object sendSyncRequestToAINodeWithGivenRetry( + TEndPoint endPoint, Object req, CnToAnSyncRequestType requestType, int retryNum) { + Throwable lastException = new TException(); + for (int retry = 0; retry < retryNum; retry++) { + try (SyncAINodeClient client = clientManager.borrowClient(endPoint)) { + return executeSyncRequest(requestType, client, req); + } catch (Exception e) { + lastException = e; + if (retry != retryNum - 1) { + LOGGER.warn("{} failed on AINode {}, retrying {}...", requestType, endPoint, retry + 1); + doRetryWait(retry); + } + } + } + LOGGER.error("{} failed on AINode {}", requestType, endPoint, lastException); + return new TSStatus(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode()) + .setMessage("All retry failed due to: " + lastException.getMessage()); + } + + private Object executeSyncRequest( + CnToAnSyncRequestType requestType, SyncAINodeClient client, Object req) throws Exception { + return Objects.requireNonNull(actionMap.get(requestType)).apply(req, client); + } + + private void doRetryWait(int retryNum) { + try { + if (retryNum < 3) { + TimeUnit.MILLISECONDS.sleep(800L); + } else if (retryNum < 5) { + TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum)); + } else { + TimeUnit.MILLISECONDS.sleep(3200L); + } + } catch (InterruptedException e) { + LOGGER.warn("Retry wait failed.", e); + Thread.currentThread().interrupt(); + } + } + + private static class ClientPoolHolder { + + private static final SyncAINodeClientPool INSTANCE = new SyncAINodeClientPool(); + + private ClientPoolHolder() { + // Empty constructor + } + } + + public static SyncAINodeClientPool getInstance() { + return SyncAINodeClientPool.ClientPoolHolder.INSTANCE; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java index 2a1c6881b14..facc2ce4ab7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveAINodeProcedure.java @@ -22,13 +22,13 @@ package org.apache.iotdb.confignode.procedure.impl.node; import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.client.sync.CnToAnSyncRequestType; +import org.apache.iotdb.confignode.client.sync.SyncAINodeClientPool; import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.state.RemoveAINodeState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.db.protocol.client.an.AINodeClient; -import org.apache.iotdb.db.protocol.client.an.AINodeClientManager; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -65,16 +65,13 @@ public class RemoveAINodeProcedure extends AbstractNodeProcedure<RemoveAINodeSta try { switch (state) { case NODE_STOP: - TSStatus resp = null; - try (AINodeClient client = - AINodeClientManager.getInstance() - .borrowClient(AINodeClientManager.AINODE_ID_PLACEHOLDER)) { - resp = client.stopAINode(); - } catch (Exception e) { - LOGGER.warn( - "Failed to stop AINode {}, but the remove process will continue.", - removedAINode.getInternalEndPoint()); - } + TSStatus resp = + (TSStatus) + SyncAINodeClientPool.getInstance() + .sendSyncRequestToAINodeWithRetry( + removedAINode.getInternalEndPoint(), + null, + CnToAnSyncRequestType.STOP_AI_NODE); if (resp != null && resp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info("Successfully stopped AINode {}", removedAINode.getInternalEndPoint()); } else { @@ -92,7 +89,6 @@ public class RemoveAINodeProcedure extends AbstractNodeProcedure<RemoveAINodeSta env.getConfigManager() .getConsensusManager() .write(new RemoveAINodePlan(removedAINode)); - if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new ProcedureException( String.format( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index d09b9245e84..7ed34359b21 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.client.property.ClientPoolProperty; import org.apache.iotdb.commons.client.property.PipeConsensusClientProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.commons.client.property.ThriftClientProperty.DefaultProperty; +import org.apache.iotdb.commons.client.sync.SyncAINodeClient; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; @@ -393,6 +394,27 @@ public class ClientPoolFactory { } } + public static class SyncAINodeClientPoolFactory + implements IClientPoolFactory<TEndPoint, SyncAINodeClient> { + + @Override + public GenericKeyedObjectPool<TEndPoint, SyncAINodeClient> createClientPool( + ClientManager<TEndPoint, SyncAINodeClient> manager) { + GenericKeyedObjectPool<TEndPoint, SyncAINodeClient> clientPool = + new GenericKeyedObjectPool<>( + new SyncAINodeClient.Factory( + manager, + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) + .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) + .build()), + new ClientPoolProperty.Builder<SyncAINodeClient>().build().getConfig()); + ClientManagerMetrics.getInstance() + .registerClientManager(this.getClass().getSimpleName(), clientPool); + return clientPool; + } + } + public static class AsyncAINodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory<TEndPoint, AsyncAINodeInternalServiceClient> { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java new file mode 100644 index 00000000000..054b6446099 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncAINodeClient.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.sync; + +import org.apache.iotdb.ainode.rpc.thrift.IAINodeRPCService; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.ClientManager; +import org.apache.iotdb.commons.client.ThriftClient; +import org.apache.iotdb.commons.client.factory.ThriftClientFactory; +import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; +import org.apache.iotdb.rpc.TConfigurationConst; +import org.apache.iotdb.rpc.TimeoutChangeableTransport; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.SocketException; + +public class SyncAINodeClient extends IAINodeRPCService.Client + implements ThriftClient, AutoCloseable { + + private final boolean printLogWhenEncounterException; + private final TEndPoint endpoint; + private final ClientManager<TEndPoint, SyncAINodeClient> clientManager; + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + + public SyncAINodeClient( + ThriftClientProperty property, + TEndPoint endpoint, + ClientManager<TEndPoint, SyncAINodeClient> clientManager) + throws TTransportException { + super( + property + .getProtocolFactory() + .getProtocol( + COMMON_CONFIG.isEnableInternalSSL() + ? DeepCopyRpcTransportFactory.INSTANCE.getTransport( + endpoint.getIp(), + endpoint.getPort(), + property.getConnectionTimeoutMs(), + COMMON_CONFIG.getTrustStorePath(), + COMMON_CONFIG.getTrustStorePwd(), + COMMON_CONFIG.getKeyStorePath(), + COMMON_CONFIG.getKeyStorePwd()) + : DeepCopyRpcTransportFactory.INSTANCE.getTransport( + new TSocket( + TConfigurationConst.defaultTConfiguration, + endpoint.getIp(), + endpoint.getPort(), + property.getConnectionTimeoutMs())))); + this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); + this.endpoint = endpoint; + this.clientManager = clientManager; + if (!getInputProtocol().getTransport().isOpen()) { + getInputProtocol().getTransport().open(); + } + } + + public int getTimeout() throws SocketException { + return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut(); + } + + public void setTimeout(int timeout) { + // the same transport is used in both input and output + ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); + } + + public TEndPoint getEndpoint() { + return endpoint; + } + + public ClientManager<TEndPoint, SyncAINodeClient> getClientManager() { + return clientManager; + } + + @Override + public void close() throws Exception { + clientManager.returnClient(endpoint, this); + } + + @Override + public void invalidate() { + getInputProtocol().getTransport().close(); + } + + @Override + public void invalidateAll() { + clientManager.clear(endpoint); + } + + @Override + public boolean printLogWhenEncounterException() { + return printLogWhenEncounterException; + } + + @Override + public String toString() { + return String.format("SyncAINodeClient{%s}", endpoint); + } + + public static class Factory extends ThriftClientFactory<TEndPoint, SyncAINodeClient> { + + public Factory( + ClientManager<TEndPoint, SyncAINodeClient> clientManager, + ThriftClientProperty thriftClientProperty) { + super(clientManager, thriftClientProperty); + } + + @Override + public void destroyObject(TEndPoint endpoint, PooledObject<SyncAINodeClient> pooledObject) { + pooledObject.getObject().invalidate(); + } + + @Override + public PooledObject<SyncAINodeClient> makeObject(TEndPoint endpoint) throws Exception { + return new DefaultPooledObject<>( + SyncThriftClientWithErrorHandler.newErrorHandler( + SyncAINodeClient.class, + SyncAINodeClient.class.getConstructor( + thriftClientProperty.getClass(), endpoint.getClass(), clientManager.getClass()), + thriftClientProperty, + endpoint, + clientManager)); + } + + @Override + public boolean validateObject(TEndPoint endpoint, PooledObject<SyncAINodeClient> pooledObject) { + return pooledObject.getObject().getInputProtocol().getTransport().isOpen(); + } + } +}
