This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/master1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 399a961b8da27175ac1f4cd024522a3a1ee9f042 Author: Beyyes <[email protected]> AuthorDate: Sat Nov 5 17:08:13 2022 +0800 perfect remove process --- .../persistence/partition/PartitionInfo.java | 2 +- .../partition/StorageGroupPartitionTable.java | 6 +- .../procedure/env/DataNodeRemoveHandler.java | 67 +++++++++++----------- .../impl/node/RemoveDataNodeProcedure.java | 6 ++ 4 files changed, 42 insertions(+), 39 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index 812393b0a2..82a461375a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -470,7 +470,7 @@ public class PartitionInfo implements SnapshotProcessor { } /** - * update a region location + * Update the location info of given regionId * * @param req UpdateRegionLocationReq * @return TSStatus diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java index 5ffd0881f5..5e98a80c3d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java @@ -432,7 +432,7 @@ public class StorageGroupPartitionTable { private void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) { RegionGroup regionGroup = regionGroupMap.get(regionId); if (regionGroup == null) { - LOGGER.warn("not find Region Group for region {}", regionId); + LOGGER.warn("Cannot find RegionGroup in addRegionNewLocation for region {}", regionId); return; } if (regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) { @@ -445,12 +445,12 @@ public class StorageGroupPartitionTable { private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) { RegionGroup regionGroup = regionGroupMap.get(regionId); if (regionGroup == null) { - LOGGER.warn("not find Region Group for region {}", regionId); + LOGGER.warn("Cannot find RegionGroup in removeRegionOldLocation for region {}", regionId); return; } if (!regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) { LOGGER.info( - "Node is Not in region locations, no need to remove it, node: {}, region: {}", + "Node is not in region locations, no need to remove it, node: {}, region: {}", node, regionId); return; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index a5f1cc5b2e..8a48624046 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -138,11 +138,11 @@ public class DataNodeRemoveHandler { */ public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) { TSStatus status; - List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId); + List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId); if (regionReplicaNodes.isEmpty()) { - LOGGER.warn("Not find region replica nodes, region: {}", regionId); + LOGGER.warn("Cannot find region replica nodes, region: {}", regionId); status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage("not find region replica nodes, region: " + regionId); + status.setMessage("Cannot find region replica nodes, region: " + regionId); return null; } @@ -167,10 +167,10 @@ public class DataNodeRemoveHandler { */ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) { TSStatus status; - List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId); + List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId); if (regionReplicaNodes.isEmpty()) { LOGGER.warn( - "{}, Not find region replica nodes in createPeer, regionId: {}", + "{}, Cannot find region replica nodes in createPeer, regionId: {}", REMOVE_DATANODE_PROCESS, regionId); status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); @@ -354,40 +354,40 @@ public class DataNodeRemoveHandler { TDataNodeLocation originalDataNode, TDataNodeLocation destDataNode) { LOGGER.info( - "Start to update region {} location from {} to {} when it migrate succeed", + "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed", regionId, - originalDataNode.getInternalEndPoint().getIp(), - destDataNode.getInternalEndPoint().getIp()); + getIdWithRpcEndpoint(originalDataNode), + getIdWithRpcEndpoint(destDataNode)); UpdateRegionLocationPlan req = new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode); TSStatus status = configManager.getPartitionManager().updateRegionLocation(req); LOGGER.info( - "Update region {} location finished, result:{}, old:{}, new:{}", + "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}", regionId, status, - originalDataNode.getInternalEndPoint().getIp(), - destDataNode.getInternalEndPoint().getIp()); + getIdWithRpcEndpoint(originalDataNode), + getIdWithRpcEndpoint(destDataNode)); + // Broadcast the latest RegionRouteMap when Region migration finished configManager.getLoadManager().broadcastLatestRegionRouteMap(); } /** - * Find region replication Nodes + * Find all DataNodes which contains the given regionId * * @param regionId region id - * @return data node location + * @return DataNode locations */ - public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) { - List<TRegionReplicaSet> regionReplicaSets = + public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) { + Optional<TRegionReplicaSet> regionReplicaSet = configManager.getPartitionManager().getAllReplicaSets().stream() .filter(rg -> rg.regionId.equals(regionId)) - .collect(Collectors.toList()); - if (regionReplicaSets.isEmpty()) { - LOGGER.warn("not find TRegionReplica for region: {}", regionId); - return Collections.emptyList(); + .findAny(); + if (regionReplicaSet.isPresent()) { + return regionReplicaSet.get().getDataNodeLocations(); } - return regionReplicaSets.get(0).getDataNodeLocations(); + return Collections.emptyList(); } private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion( @@ -571,31 +571,28 @@ public class DataNodeRemoveHandler { */ private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica( TConsensusGroupId regionId, TDataNodeLocation filterLocation) { - List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId); - if (regionReplicaNodes.isEmpty()) { - LOGGER.warn("Not find region replica nodes, region: {}", regionId); + List<TDataNodeLocation> regionLocations = findRegionLocations(regionId); + if (regionLocations.isEmpty()) { + LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId); return Optional.empty(); } + // Choosing the RUNNING DataNodes to execute firstly + // If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly List<TDataNodeLocation> aliveDataNodes = configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream() .map(TDataNodeConfiguration::getLocation) .collect(Collectors.toList()); - // filter the RUNNING datanode firstly - // if all the datanodes are not in RUNNING status, choose the REMOVING datanode - // because REMOVING datanode is also alive, it can execute rpc request - if (aliveDataNodes.isEmpty()) { - aliveDataNodes = - configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream() - .map(TDataNodeConfiguration::getLocation) - .collect(Collectors.toList()); - } + aliveDataNodes.addAll( + configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream() + .map(TDataNodeConfiguration::getLocation) + .collect(Collectors.toList())); // TODO return the node which has lowest load. - for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) { - if (aliveDataNodes.contains(regionReplicaNode) && !regionReplicaNode.equals(filterLocation)) { - return Optional.of(regionReplicaNode); + for (TDataNodeLocation aliveDataNode : aliveDataNodes) { + if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) { + return Optional.of(aliveDataNode); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java index f8c6f7b83c..fe59a56d97 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java @@ -196,6 +196,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode); addChildProcedure(regionMigrateProcedure); LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId); + } else { + LOG.error( + "{}, Cannot find target DataNode to remove the region: {}", + REMOVE_DATANODE_PROCESS, + regionId); + // TODO terminate all the uncompleted remove datanode process } }); }
