This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/master in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27a9fb4133ffbdf6fe51b403240dc56eaf6a5775 Author: Beyyes <[email protected]> AuthorDate: Sun Nov 13 16:13:14 2022 +0800 donnot stop datanode when some regions migrated failed --- .../iotdb/confignode/manager/ProcedureManager.java | 2 +- .../iotdb/confignode/manager/node/NodeManager.java | 11 +- .../procedure/env/DataNodeRemoveHandler.java | 12 +-- .../impl/node/RemoveDataNodeProcedure.java | 120 ++++++++++++--------- .../impl/statemachine/RegionMigrateProcedure.java | 7 +- 5 files changed, 89 insertions(+), 63 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 4deef73d26..79ecc64828 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -326,7 +326,7 @@ public class ProcedureManager { .forEach( tDataNodeLocation -> { this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation)); - LOGGER.info("Submit to remove data node procedure, {}", tDataNodeLocation); + LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation); }); return true; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 3db90ba4fc..f71a481a25 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -279,14 +279,14 @@ public class NodeManager { dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan); if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.error( - "The remove DataNode request check failed. req: {}, check result: {}", + "The remove DataNode request check failed. req: {}, check result: {}", removeDataNodePlan, preCheckStatus.getStatus()); return preCheckStatus; } + // Do transfer of the DataNodes before remove DataNodeToStatusResp dataSet = new DataNodeToStatusResp(); - // do transfer of the DataNodes before remove if (configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { dataSet.setStatus( @@ -294,7 +294,8 @@ public class NodeManager { .setMessage("Fail to do transfer of the DataNodes")); return dataSet; } - // if add request to queue, then return to client + + // Add request to queue, then return to client boolean registerSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan); TSStatus status; @@ -307,7 +308,9 @@ public class NodeManager { } dataSet.setStatus(status); - LOGGER.info("NodeManager finished to remove DataNode {}", removeDataNodePlan); + LOGGER.info( + "NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}", + removeDataNodePlan); return dataSet; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index 6660c0c080..7ae18943e6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -77,15 +77,15 @@ public class DataNodeRemoveHandler { /** * Get all consensus group id in this node * - * @param dataNodeLocation data node location - * @return group id list + * @param removedDataNode the DataNode to be removed + * @return group id list to be migrated */ - public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) { + public List<TConsensusGroupId> getMigratedDataNodeRegions(TDataNodeLocation removedDataNode) { return configManager.getPartitionManager().getAllReplicaSets().stream() .filter( - rg -> - rg.getDataNodeLocations().contains(dataNodeLocation) - && rg.regionId.getType() != TConsensusGroupType.ConfigNodeRegion) + replicaSet -> + replicaSet.getDataNodeLocations().contains(removedDataNode) + && replicaSet.regionId.getType() != TConsensusGroupType.ConfigNodeRegion) .map(TRegionReplicaSet::getRegionId) .collect(Collectors.toList()); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java index 3380aa3c18..fc432bb966 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java @@ -21,9 +21,11 @@ package org.apache.iotdb.confignode.procedure.impl.node; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure; import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState; @@ -37,6 +39,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS; @@ -45,24 +48,26 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod private static final Logger LOG = LoggerFactory.getLogger(RemoveDataNodeProcedure.class); private static final int RETRY_THRESHOLD = 5; - private TDataNodeLocation disableDataNodeLocation; + private TDataNodeLocation removedDataNode; - private List<TConsensusGroupId> execDataNodeRegionIds = new ArrayList<>(); + private List<TConsensusGroupId> migratedDataNodeRegions = new ArrayList<>(); public RemoveDataNodeProcedure() { super(); } - public RemoveDataNodeProcedure(TDataNodeLocation disableDataNodeLocation) { + public RemoveDataNodeProcedure(TDataNodeLocation removedDataNode) { super(); - this.disableDataNodeLocation = disableDataNodeLocation; + this.removedDataNode = removedDataNode; } @Override protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) { - if (disableDataNodeLocation == null) { + if (removedDataNode == null) { return Flow.NO_MORE_STATE; } + + DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); try { switch (state) { case REGION_REPLICA_CHECK: @@ -70,24 +75,24 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod setNextState(RemoveDataNodeState.REMOVE_DATA_NODE_PREPARE); } else { LOG.error( - "{}, Can not remove DataNode {} because the number of DataNodes is less or equal than region replica number", + "{}, Can not remove DataNode {} " + + "because the number of DataNodes is less or equal than region replica number", REMOVE_DATANODE_PROCESS, - disableDataNodeLocation); + removedDataNode); return Flow.NO_MORE_STATE; } case REMOVE_DATA_NODE_PREPARE: // mark the datanode as removing status and broadcast region route map - env.markDataNodeAsRemovingAndBroadcast(disableDataNodeLocation); - execDataNodeRegionIds = - env.getDataNodeRemoveHandler().getDataNodeRegionIds(disableDataNodeLocation); + env.markDataNodeAsRemovingAndBroadcast(removedDataNode); + migratedDataNodeRegions = handler.getMigratedDataNodeRegions(removedDataNode); LOG.info( "{}, DataNode regions to be removed is {}", REMOVE_DATANODE_PROCESS, - execDataNodeRegionIds); + migratedDataNodeRegions); setNextState(RemoveDataNodeState.BROADCAST_DISABLE_DATA_NODE); break; case BROADCAST_DISABLE_DATA_NODE: - env.getDataNodeRemoveHandler().broadcastDisableDataNode(disableDataNodeLocation); + handler.broadcastDisableDataNode(removedDataNode); setNextState(RemoveDataNodeState.SUBMIT_REGION_MIGRATE); break; case SUBMIT_REGION_MIGRATE: @@ -95,11 +100,11 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod setNextState(RemoveDataNodeState.STOP_DATA_NODE); break; case STOP_DATA_NODE: - // TODO if region migrate is failed, don't execute STOP_DATA_NODE - LOG.info( - "{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, disableDataNodeLocation); - env.getDataNodeRemoveHandler().removeDataNodePersistence(disableDataNodeLocation); - env.getDataNodeRemoveHandler().stopDataNode(disableDataNodeLocation); + if (isAllRegionMigratedSuccessfully(env)) { + LOG.info("{}, Begin to stop DataNode: {}", REMOVE_DATANODE_PROCESS, removedDataNode); + handler.removeDataNodePersistence(removedDataNode); + handler.stopDataNode(removedDataNode); + } return Flow.NO_MORE_STATE; } } catch (Exception e) { @@ -107,10 +112,7 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod setFailure(new ProcedureException("Remove Data Node failed " + state)); } else { LOG.error( - "Retrievable error trying to remove data node {}, state {}", - disableDataNodeLocation, - state, - e); + "Retrievable error trying to remove data node {}, state {}", removedDataNode, state, e); if (getCycles() > RETRY_THRESHOLD) { setFailure(new ProcedureException("State stuck at " + state)); } @@ -119,6 +121,46 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod return Flow.HAS_MORE_STATE; } + private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) { + migratedDataNodeRegions.forEach( + regionId -> { + TDataNodeLocation destDataNode = + env.getDataNodeRemoveHandler().findDestDataNode(regionId); + if (destDataNode != null) { + RegionMigrateProcedure regionMigrateProcedure = + new RegionMigrateProcedure(regionId, removedDataNode, destDataNode); + addChildProcedure(regionMigrateProcedure); + LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId); + } else { + LOG.error( + "{}, Cannot find target DataNode to migrate the region: {}", + REMOVE_DATANODE_PROCESS, + regionId); + // TODO terminate all the uncompleted remove datanode process + } + }); + } + + private boolean isAllRegionMigratedSuccessfully(ConfigNodeProcedureEnv env) { + List<TRegionReplicaSet> replicaSets = + env.getConfigManager().getPartitionManager().getAllReplicaSets(); + + List<TConsensusGroupId> migratedFailedRegions = + replicaSets.stream() + .filter(replica -> replica.getDataNodeLocations().contains(removedDataNode)) + .map(TRegionReplicaSet::getRegionId) + .collect(Collectors.toList()); + if (migratedFailedRegions.size() > 0) { + LOG.warn( + "{}, Some regions are migrated failed, the StopDataNode process should not be executed, migratedFailedRegions: {}", + REMOVE_DATANODE_PROCESS, + migratedFailedRegions); + return false; + } + + return true; + } + @Override protected void rollbackState(ConfigNodeProcedureEnv env, RemoveDataNodeState state) throws IOException, InterruptedException, ProcedureException {} @@ -157,9 +199,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod public void serialize(DataOutputStream stream) throws IOException { stream.writeShort(ProcedureType.REMOVE_DATA_NODE_PROCEDURE.getTypeCode()); super.serialize(stream); - ThriftCommonsSerDeUtils.serializeTDataNodeLocation(disableDataNodeLocation, stream); - stream.writeInt(execDataNodeRegionIds.size()); - execDataNodeRegionIds.forEach( + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(removedDataNode, stream); + stream.writeInt(migratedDataNodeRegions.size()); + migratedDataNodeRegions.forEach( tid -> ThriftCommonsSerDeUtils.serializeTConsensusGroupId(tid, stream)); } @@ -167,11 +209,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); try { - disableDataNodeLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + removedDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); int regionSize = byteBuffer.getInt(); - execDataNodeRegionIds = new ArrayList<>(regionSize); + migratedDataNodeRegions = new ArrayList<>(regionSize); for (int i = 0; i < regionSize; i++) { - execDataNodeRegionIds.add(ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer)); + migratedDataNodeRegions.add( + ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer)); } } catch (ThriftSerDeException e) { LOG.error("Error in deserialize RemoveConfigNodeProcedure", e); @@ -184,28 +227,9 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod RemoveDataNodeProcedure thatProc = (RemoveDataNodeProcedure) that; return thatProc.getProcId() == this.getProcId() && thatProc.getState() == this.getState() - && thatProc.disableDataNodeLocation.equals(this.disableDataNodeLocation); + && thatProc.removedDataNode.equals(this.removedDataNode) + && thatProc.migratedDataNodeRegions.equals(this.migratedDataNodeRegions); } return false; } - - private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) { - execDataNodeRegionIds.forEach( - regionId -> { - TDataNodeLocation destDataNode = - env.getDataNodeRemoveHandler().findDestDataNode(regionId); - if (destDataNode != null) { - RegionMigrateProcedure regionMigrateProcedure = - new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode); - addChildProcedure(regionMigrateProcedure); - LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId); - } else { - LOG.error( - "{}, Cannot find target DataNode to remove the region: {}", - REMOVE_DATANODE_PROCESS, - regionId); - // TODO terminate all the uncompleted remove datanode process - } - }); - } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java index f15ed67489..07a498c7a1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java @@ -98,7 +98,7 @@ public class RegionMigrateProcedure if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId, state); } else { - throw new ProcedureException("Failed to add region peer"); + throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode"); } setNextState(RegionTransitionState.CHANGE_REGION_LEADER); break; @@ -111,7 +111,7 @@ public class RegionMigrateProcedure if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId, state); } else { - throw new ProcedureException("Failed to remove region peer"); + throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode"); } setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER); break; @@ -150,8 +150,7 @@ public class RegionMigrateProcedure "Procedure retried failed exceed 5 times, state stuck at " + state)); } - // meets exception in region migrate process - // terminate the process + // meets exception in region migrate process terminate the process return Flow.NO_MORE_STATE; } }
