This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/master2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a1ef119387b6311d056399f965f4e5cb09c980ab Author: Beyyes <[email protected]> AuthorDate: Wed Nov 9 14:02:12 2022 +0800 add sendSyncRequestToDataNodeWithGivenRetry method --- .../client/sync/SyncDataNodeClientPool.java | 98 ++++++++++++++-------- .../confignode/persistence/node/NodeInfo.java | 4 +- .../procedure/env/DataNodeRemoveHandler.java | 11 ++- .../impl/DataNodeInternalRPCServiceImpl.java | 2 +- 4 files changed, 73 insertions(+), 42 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index d50210bb4d..f70201db65 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -50,7 +50,7 @@ public class SyncDataNodeClientPool { private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataNodeClientPool.class); - private static final int retryNum = 6; + private static final int DEFAULT_RETRY_NUM = 6; private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager; @@ -64,41 +64,9 @@ public class SyncDataNodeClientPool { public TSStatus sendSyncRequestToDataNodeWithRetry( TEndPoint endPoint, Object req, DataNodeRequestType requestType) { Throwable lastException = null; - for (int retry = 0; retry < retryNum; retry++) { + for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) { try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) { - switch (requestType) { - case INVALIDATE_PARTITION_CACHE: - return client.invalidatePartitionCache((TInvalidateCacheReq) req); - case INVALIDATE_SCHEMA_CACHE: - return client.invalidateSchemaCache((TInvalidateCacheReq) req); - case CREATE_SCHEMA_REGION: - return client.createSchemaRegion((TCreateSchemaRegionReq) req); - case CREATE_DATA_REGION: - return client.createDataRegion((TCreateDataRegionReq) req); - case DELETE_REGION: - return client.deleteRegion((TConsensusGroupId) req); - case INVALIDATE_PERMISSION_CACHE: - return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req); - case DISABLE_DATA_NODE: - return client.disableDataNode((TDisableDataNodeReq) req); - case STOP_DATA_NODE: - return client.stopDataNode(); - case SET_SYSTEM_STATUS: - return client.setSystemStatus((String) req); - case UPDATE_TEMPLATE: - return client.updateTemplate((TUpdateTemplateReq) req); - case CREATE_NEW_REGION_PEER: - return client.createNewRegionPeer((TCreatePeerReq) req); - case ADD_REGION_PEER: - return client.addRegionPeer((TMaintainPeerReq) req); - case REMOVE_REGION_PEER: - return client.removeRegionPeer((TMaintainPeerReq) req); - case DELETE_OLD_REGION_PEER: - return client.deleteOldRegionPeer((TMaintainPeerReq) req); - default: - return RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType); - } + return executeSyncRequest(requestType, client, req); } catch (TException | IOException e) { lastException = e; LOGGER.warn( @@ -115,6 +83,66 @@ public class SyncDataNodeClientPool { .setMessage("All retry failed due to: " + lastException.getMessage()); } + public TSStatus sendSyncRequestToDataNodeWithGivenRetry( + TEndPoint endPoint, Object req, DataNodeRequestType requestType, int retryNum) { + Throwable lastException = new TException(); + for (int retry = 0; retry < retryNum; retry++) { + try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) { + return executeSyncRequest(requestType, client, req); + } catch (TException | IOException e) { + lastException = e; + LOGGER.warn( + "{} failed on DataNode {}, because {}, retrying {}...", + requestType, + endPoint, + e.getMessage(), + retry); + doRetryWait(retry); + } + } + LOGGER.error("{} failed on DataNode {}", requestType, endPoint, lastException); + return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode()) + .setMessage("All retry failed due to: " + lastException.getMessage()); + } + + private TSStatus executeSyncRequest(DataNodeRequestType requestType, + SyncDataNodeInternalServiceClient client, + Object req) throws TException { + switch (requestType) { + case INVALIDATE_PARTITION_CACHE: + return client.invalidatePartitionCache((TInvalidateCacheReq) req); + case INVALIDATE_SCHEMA_CACHE: + return client.invalidateSchemaCache((TInvalidateCacheReq) req); + case CREATE_SCHEMA_REGION: + return client.createSchemaRegion((TCreateSchemaRegionReq) req); + case CREATE_DATA_REGION: + return client.createDataRegion((TCreateDataRegionReq) req); + case DELETE_REGION: + return client.deleteRegion((TConsensusGroupId) req); + case INVALIDATE_PERMISSION_CACHE: + return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req); + case DISABLE_DATA_NODE: + return client.disableDataNode((TDisableDataNodeReq) req); + case STOP_DATA_NODE: + return client.stopDataNode(); + case SET_SYSTEM_STATUS: + return client.setSystemStatus((String) req); + case UPDATE_TEMPLATE: + return client.updateTemplate((TUpdateTemplateReq) req); + case CREATE_NEW_REGION_PEER: + return client.createNewRegionPeer((TCreatePeerReq) req); + case ADD_REGION_PEER: + return client.addRegionPeer((TMaintainPeerReq) req); + case REMOVE_REGION_PEER: + return client.removeRegionPeer((TMaintainPeerReq) req); + case DELETE_OLD_REGION_PEER: + return client.deleteOldRegionPeer((TMaintainPeerReq) req); + default: + return RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType); + } + } + private void doRetryWait(int retryNum) { try { TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum)); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java index ad2f4d23b3..cb6e0f22ca 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/node/NodeInfo.java @@ -165,7 +165,7 @@ public class NodeInfo implements SnapshotProcessor { */ public TSStatus removeDataNode(RemoveDataNodePlan req) { LOGGER.info( - "{}, There are {} data node in cluster before executed remove-datanode.sh", + "{}, There are {} data node in cluster before executed RemoveDataNodePlan", REMOVE_DATANODE_PROCESS, registeredDataNodes.size()); @@ -181,7 +181,7 @@ public class NodeInfo implements SnapshotProcessor { dataNodeInfoReadWriteLock.writeLock().unlock(); } LOGGER.info( - "{}, There are {} data node in cluster after executed remove-datanode.sh", + "{}, There are {} data node in cluster after executed RemoveDataNodePlan", REMOVE_DATANODE_PROCESS, registeredDataNodes.size()); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index 2e29166188..525d3fb783 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -412,14 +412,17 @@ public class DataNodeRemoveHandler { * @param dataNode old data node */ public void stopDataNode(TDataNodeLocation dataNode) { - LOGGER.info("{}, Begin to stop DataNode {}", REMOVE_DATANODE_PROCESS, dataNode); + LOGGER.info( + "{}, Begin to stop DataNode and kill the DataNode process {}", + REMOVE_DATANODE_PROCESS, + dataNode); AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint()); TSStatus status = SyncDataNodeClientPool.getInstance() - .sendSyncRequestToDataNodeWithRetry( - dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE); + .sendSyncRequestToDataNodeWithGivenRetry( + dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE, 2); configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId()); - LOGGER.info("{}, Stop Data Node {} result: {}", REMOVE_DATANODE_PROCESS, dataNode, status); + LOGGER.info("{}, Stop Data Node result: {}, stoppedDataNode: {}", REMOVE_DATANODE_PROCESS, status, dataNode); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java index ecade45ea8..cef5dda081 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1482,7 +1482,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface DataNode.getInstance().stop(); status.setMessage("stop datanode succeed"); } catch (Exception e) { - LOGGER.error("stop Data Node error", e); + LOGGER.error("Stop Data Node error", e); status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode()); status.setMessage(e.getMessage()); }
