This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/remove_node_stability
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/remove_node_stability 
by this push:
     new 20dd3fab35 can use REMVOING data node to execute addRegionPeer in 
remove datanode process
20dd3fab35 is described below

commit 20dd3fab353f73139fbaea2039f1e38432c2f7b0
Author: Beyyes <[email protected]>
AuthorDate: Tue Oct 11 20:13:52 2022 +0800

    can use REMVOING data node to execute addRegionPeer in remove datanode 
process
---
 .../iotdb/confignode/persistence/NodeInfo.java     |  8 ++-
 .../procedure/env/DataNodeRemoveHandler.java       | 73 +++++++++++-----------
 .../impl/statemachine/RegionMigrateProcedure.java  | 40 ++++++++----
 3 files changed, 71 insertions(+), 50 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index fd64713896..7adcf7e389 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+
 /**
  * The NodeInfo stores cluster node information. The cluster node information 
including: 1. DataNode
  * information 2. ConfigNode information
@@ -167,7 +169,8 @@ public class NodeInfo implements SnapshotProcessor {
    */
   public TSStatus removeDataNode(RemoveDataNodePlan req) {
     LOGGER.info(
-        "there are {} data node in cluster before executed remove-datanode.sh",
+        "{}, There are {} data node in cluster before executed 
remove-datanode.sh",
+        REMOVE_DATANODE_PROCESS,
         registeredDataNodes.size());
     try {
       dataNodeInfoReadWriteLock.writeLock().lock();
@@ -181,7 +184,8 @@ public class NodeInfo implements SnapshotProcessor {
       dataNodeInfoReadWriteLock.writeLock().unlock();
     }
     LOGGER.info(
-        "there are {} data node in cluster after executed remove-datanode.sh",
+        "{}, There are {} data node in cluster after executed 
remove-datanode.sh",
+        REMOVE_DATANODE_PROCESS,
         registeredDataNodes.size());
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index f816814bb9..9a7302b8f9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -69,6 +69,12 @@ public class DataNodeRemoveHandler {
     this.configManager = configManager;
   }
 
+  public static String getIdWithRpcEndpoint(TDataNodeLocation location) {
+    return String.format(
+        "[dataNodeId: %s, clientRpcEndPoint: %s]",
+        location.getDataNodeId(), location.getClientRpcEndPoint());
+  }
+
   /**
    * Get all consensus group id in this node
    *
@@ -120,7 +126,8 @@ public class DataNodeRemoveHandler {
     }
 
     LOGGER.info(
-        "DataNodeRemoveService finished broadcastDisableDataNode to cluster, 
disabledDataNode: {}",
+        "{}, DataNodeRemoveService finished broadcastDisableDataNode to 
cluster, disabledDataNode: {}",
+        REMOVE_DATANODE_PROCESS,
         getIdWithRpcEndpoint(disabledDataNode));
   }
 
@@ -187,16 +194,16 @@ public class DataNodeRemoveHandler {
                 DataNodeRequestType.CREATE_NEW_REGION_PEER);
 
     LOGGER.info(
-        "{}, Send action createNewRegionPeer finished, regionId: {}, 
destDataNode: {}",
+        "{}, Send action createNewRegionPeer finished, regionId: {}, 
newPeerDataNodeId: {}",
         REMOVE_DATANODE_PROCESS,
         regionId,
-        destDataNode);
+        getIdWithRpcEndpoint(destDataNode));
     if (isFailed(status)) {
       LOGGER.error(
-          "{}, Send action createNewRegionPeer error, regionId: {}, 
destDataNode: {}, result: {}",
+          "{}, Send action createNewRegionPeer error, regionId: {}, 
newPeerDataNodeId: {}, result: {}",
           REMOVE_DATANODE_PROCESS,
           regionId,
-          destDataNode,
+          getIdWithRpcEndpoint(destDataNode),
           status);
     }
     return status;
@@ -223,13 +230,13 @@ public class DataNodeRemoveHandler {
     if (!selectedDataNode.isPresent()) {
       LOGGER.warn(
           "{}, There are no other DataNodes could be selected to perform the 
add peer process, "
-              + "please check RegionGroup: {} by SQL: show regions",
+              + "please check RegionGroup: {} by show regions sql command",
           REMOVE_DATANODE_PROCESS,
           regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
       status.setMessage(
           "There are no other DataNodes could be selected to perform the add 
peer process, "
-              + "please check by show regions command");
+              + "please check by show regions sql command");
       return status;
     }
 
@@ -306,19 +313,6 @@ public class DataNodeRemoveHandler {
   public TSStatus deleteOldRegionPeer(
       TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
 
-    // when SchemaReplicationFactor==1, execute deleteOldRegionPeer method 
will cause error
-    // user must delete the related data manually
-    if (CONF.getSchemaReplicationFactor() == 1
-        && TConsensusGroupType.SchemaRegion.equals(regionId.getType())) {
-      String errorMessage =
-          "deleteOldRegionPeer is not supported for schemaRegion when 
SchemaReplicationFactor equals 1, "
-              + "you are supposed to delete the region data of datanode 
manually";
-      LOGGER.info("{}, {}", REMOVE_DATANODE_PROCESS, errorMessage);
-      TSStatus status = new 
TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage(errorMessage);
-      return status;
-    }
-
     // when DataReplicationFactor==1, execute deleteOldRegionPeer method will 
cause error
     // user must delete the related data manually
     // TODO if multi-leader supports deleteOldRegionPeer when 
DataReplicationFactor==1?
@@ -421,14 +415,14 @@ public class DataNodeRemoveHandler {
    * @throws ProcedureException procedure exception
    */
   public TSStatus stopDataNode(TDataNodeLocation dataNode) throws 
ProcedureException {
-    LOGGER.info("Begin to stop Data Node {}", dataNode);
+    LOGGER.info("{}, Begin to stop Data Node {}", REMOVE_DATANODE_PROCESS, 
dataNode);
     
AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
     TSStatus status =
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithRetry(
                 dataNode.getInternalEndPoint(), dataNode, 
DataNodeRequestType.STOP_DATA_NODE);
     configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
-    LOGGER.info("stop Data Node {} result: {}", dataNode, status);
+    LOGGER.info("{}, Stop Data Node {} result: {}", REMOVE_DATANODE_PROCESS, 
dataNode, status);
     return status;
   }
 
@@ -547,15 +541,16 @@ public class DataNodeRemoveHandler {
     configManager.getConsensusManager().write(new 
RemoveDataNodePlan(removeDataNodes));
   }
 
-  public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation 
tDataNodeLocation) {
+  public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation 
originalDataNode) {
     Optional<TDataNodeLocation> newLeaderNode =
-        filterDataNodeWithOtherRegionReplica(regionId, tDataNodeLocation);
+        filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
     if (newLeaderNode.isPresent()) {
       SyncDataNodeClientPool.getInstance()
           .changeRegionLeader(
-              regionId, tDataNodeLocation.getInternalEndPoint(), 
newLeaderNode.get());
+              regionId, originalDataNode.getInternalEndPoint(), 
newLeaderNode.get());
       LOGGER.info(
-          "Change region leader finished, region is {}, newLeaderNode is {}",
+          "{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
+          REMOVE_DATANODE_PROCESS,
           regionId,
           newLeaderNode);
     }
@@ -582,16 +577,24 @@ public class DataNodeRemoveHandler {
             .map(TDataNodeConfiguration::getLocation)
             .collect(Collectors.toList());
 
-    // TODO replace findAny() by select the low load node.
-    return regionReplicaNodes.stream()
-        .filter(e -> aliveDataNodes.contains(e) && !e.equals(filterLocation))
-        .findAny();
-  }
+    // filter the RUNNING datanode firstly
+    // if all the datanodes are not in RUNNING status, choose the REMOVING 
datanode
+    // because REMOVING datanode is also alive, it can execute rpc request
+    if (aliveDataNodes.isEmpty()) {
+      aliveDataNodes =
+          
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+              .map(TDataNodeConfiguration::getLocation)
+              .collect(Collectors.toList());
+    }
 
-  private String getIdWithRpcEndpoint(TDataNodeLocation location) {
-    return String.format(
-        "dataNodeId: %s, clientRpcEndPoint: %s",
-        location.getDataNodeId(), location.getClientRpcEndPoint());
+    // TODO return the node which has lowest load.
+    for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
+      if (aliveDataNodes.contains(regionReplicaNode) && 
!regionReplicaNode.equals(filterLocation)) {
+        return Optional.of(regionReplicaNode);
+      }
+    }
+
+    return Optional.empty();
   }
 
   /**
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 6d4343c084..2d17ffe374 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -39,6 +39,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
 import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 
 /** region migrate procedure */
@@ -92,8 +93,7 @@ public class RegionMigrateProcedure
         case ADD_REGION_PEER:
           tsStatus = 
env.getDataNodeRemoveHandler().addRegionPeer(destDataNode, consensusGroupId);
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            waitForOneMigrationStepFinished(consensusGroupId);
-            LOG.info("Wait for ADD_REGION_PEER finished, regionId: {}", 
consensusGroupId);
+            waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
             throw new ProcedureException("Failed to add region peer");
           }
@@ -108,8 +108,7 @@ public class RegionMigrateProcedure
               env.getDataNodeRemoveHandler()
                   .removeRegionPeer(originalDataNode, destDataNode, 
consensusGroupId);
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            waitForOneMigrationStepFinished(consensusGroupId);
-            LOG.info("Wait REMOVE_REGION_PEER finished, regionId: {}", 
consensusGroupId);
+            waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
             throw new ProcedureException("Failed to remove region peer");
           }
@@ -120,8 +119,7 @@ public class RegionMigrateProcedure
               env.getDataNodeRemoveHandler()
                   .deleteOldRegionPeer(originalDataNode, consensusGroupId);
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            waitForOneMigrationStepFinished(consensusGroupId);
-            LOG.info("Wait for DELETE_OLD_REGION_PEER finished, regionId: {}", 
consensusGroupId);
+            waitForOneMigrationStepFinished(consensusGroupId, state);
           }
           // remove consensus group after a node stop, which will be failed, 
but we will
           // continuously execute.
@@ -134,15 +132,18 @@ public class RegionMigrateProcedure
       }
     } catch (Exception e) {
       LOG.error(
-          "Meets error in region migrate state, please do the rollback 
operation yourself manually according to the error message!!! "
+          "{}, Meets error in region migrate state, "
+              + "please do the rollback operation yourself manually according 
to the error message!!! "
               + "error state: {}, migrateResult: {}",
+          REMOVE_DATANODE_PROCESS,
           state,
           migrateResult);
       if (isRollbackSupported(state)) {
         setFailure(new ProcedureException("Region migrate failed at state: " + 
state));
       } else {
         LOG.error(
-            "Failed state is not support rollback, filed state {}, 
originalDataNode: {}",
+            "{}, Failed state is not support rollback, filed state {}, 
originalDataNode: {}",
+            REMOVE_DATANODE_PROCESS,
             state,
             originalDataNode);
         if (getCycles() > RETRY_THRESHOLD) {
@@ -249,8 +250,15 @@ public class RegionMigrateProcedure
     return false;
   }
 
-  public TSStatus waitForOneMigrationStepFinished(TConsensusGroupId 
consensusGroupId)
-      throws Exception {
+  public TSStatus waitForOneMigrationStepFinished(
+      TConsensusGroupId consensusGroupId, RegionTransitionState state) throws 
Exception {
+
+    LOG.info(
+        "{}, Wait for state {} finished, regionId: {}",
+        REMOVE_DATANODE_PROCESS,
+        state,
+        consensusGroupId);
+
     TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode());
     synchronized (regionMigrateLock) {
       try {
@@ -262,7 +270,7 @@ public class RegionMigrateProcedure
               String.format("Region migrate failed, regionId: %s", 
consensusGroupId));
         }
       } catch (InterruptedException e) {
-        LOG.error("region migrate {} interrupt", consensusGroupId, e);
+        LOG.error("{}, region migrate {} interrupt", REMOVE_DATANODE_PROCESS, 
consensusGroupId, e);
         Thread.currentThread().interrupt();
         status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
         status.setMessage("wait region migrate interrupt," + e.getMessage());
@@ -274,14 +282,20 @@ public class RegionMigrateProcedure
   /** DataNode report region migrate result to ConfigNode, and continue */
   public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq 
req) {
 
-    LOG.info("ConfigNode received DataNode reported region migrate result: {} 
", req);
+    LOG.info(
+        "{}, ConfigNode received DataNode reported region migrate result: {}",
+        REMOVE_DATANODE_PROCESS,
+        req);
 
     // TODO the req is used in roll back
     synchronized (regionMigrateLock) {
       TSStatus migrateStatus = req.getMigrateResult();
       // migrate failed
       if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
-        LOG.info("Region migrate executed failed in DataNode, migrateStatus: 
{}", migrateStatus);
+        LOG.info(
+            "{}, Region migrate executed failed in DataNode, migrateStatus: 
{}",
+            REMOVE_DATANODE_PROCESS,
+            migrateStatus);
         migrateSuccess = false;
         migrateResult = migrateStatus.toString();
       }

Reply via email to