This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/master2 by this push:
     new 1e106c97fe fix
1e106c97fe is described below

commit 1e106c97fe6ecbf9157e858b43843b755602d4a0
Author: Beyyes <[email protected]>
AuthorDate: Wed Nov 9 15:09:41 2022 +0800

    fix
---
 .../client/sync/SyncDataNodeClientPool.java        | 24 ++++++-------
 .../manager/load/balancer/RouteBalancer.java       |  5 +++
 .../procedure/env/DataNodeRemoveHandler.java       | 40 +++++++++++++++++++---
 .../impl/statemachine/RegionMigrateProcedure.java  | 19 +++++-----
 4 files changed, 61 insertions(+), 27 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index f70201db65..caa8528b4c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -84,7 +84,7 @@ public class SyncDataNodeClientPool {
   }
 
   public TSStatus sendSyncRequestToDataNodeWithGivenRetry(
-          TEndPoint endPoint, Object req, DataNodeRequestType requestType, int 
retryNum) {
+      TEndPoint endPoint, Object req, DataNodeRequestType requestType, int 
retryNum) {
     Throwable lastException = new TException();
     for (int retry = 0; retry < retryNum; retry++) {
       try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(endPoint)) {
@@ -92,22 +92,22 @@ public class SyncDataNodeClientPool {
       } catch (TException | IOException e) {
         lastException = e;
         LOGGER.warn(
-                "{} failed on DataNode {}, because {}, retrying {}...",
-                requestType,
-                endPoint,
-                e.getMessage(),
-                retry);
+            "{} failed on DataNode {}, because {}, retrying {}...",
+            requestType,
+            endPoint,
+            e.getMessage(),
+            retry);
         doRetryWait(retry);
       }
     }
     LOGGER.error("{} failed on DataNode {}", requestType, endPoint, 
lastException);
     return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
-            .setMessage("All retry failed due to: " + 
lastException.getMessage());
+        .setMessage("All retry failed due to: " + lastException.getMessage());
   }
 
-  private TSStatus executeSyncRequest(DataNodeRequestType requestType,
-                                      SyncDataNodeInternalServiceClient client,
-                                      Object req) throws TException {
+  private TSStatus executeSyncRequest(
+      DataNodeRequestType requestType, SyncDataNodeInternalServiceClient 
client, Object req)
+      throws TException {
     switch (requestType) {
       case INVALIDATE_PARTITION_CACHE:
         return client.invalidatePartitionCache((TInvalidateCacheReq) req);
@@ -139,7 +139,7 @@ public class SyncDataNodeClientPool {
         return client.deleteOldRegionPeer((TMaintainPeerReq) req);
       default:
         return RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " 
+ requestType);
+            TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + 
requestType);
     }
   }
 
@@ -162,7 +162,7 @@ public class SyncDataNodeClientPool {
    */
   public TSStatus changeRegionLeader(
       TConsensusGroupId regionId, TEndPoint dataNode, TDataNodeLocation 
newLeaderNode) {
-    LOGGER.info("send RPC to data node: {} for changing regions leader on it", 
dataNode);
+    LOGGER.info("Send RPC to data node: {} for changing regions leader on it", 
dataNode);
     TSStatus status;
     try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(dataNode)) {
       TRegionLeaderChangeReq req = new TRegionLeaderChangeReq(regionId, 
newLeaderNode);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index e9d21891a8..76b37dff45 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -313,6 +313,11 @@ public class RouteBalancer {
     }
   }
 
+  public void changeLeaderForMultiLeaderConsensus(
+      TConsensusGroupId regionGroupId, int newLeaderId) {
+    regionRouteMap.setLeader(regionGroupId, newLeaderId);
+  }
+
   private void changeRegionLeader(
       String consensusProtocolClass,
       AtomicInteger requestId,
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 525d3fb783..394ed19a59 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
@@ -49,10 +50,13 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static 
org.apache.iotdb.consensus.ConsensusFactory.MULTI_LEADER_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
 
 public class DataNodeRemoveHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeRemoveHandler.class);
@@ -544,15 +548,43 @@ public class DataNodeRemoveHandler {
     configManager.getConsensusManager().write(new 
RemoveDataNodePlan(removeDataNodes));
   }
 
-  public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation 
originalDataNode) {
+  /**
+   * Change the leader of given Region
+   *
+   * @param regionId The region to be migrated
+   * @param originalDataNode The DataNode where the region locates
+   * @param migrateDestDataNode The DataNode where the region is to be migrated
+   */
+  public void changeRegionLeader(TConsensusGroupId regionId,
+                                 TDataNodeLocation originalDataNode,
+                                 TDataNodeLocation migrateDestDataNode) {
     Optional<TDataNodeLocation> newLeaderNode =
         filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
+
+    if (TConsensusGroupType.DataRegion.equals(regionId.getType()) &&
+            
MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+      if (CONF.getDataReplicationFactor() == 1) {
+        configManager.getLoadManager().getRouteBalancer().
+                changeLeaderForMultiLeaderConsensus(regionId, 
migrateDestDataNode.getDataNodeId());
+      } else if (newLeaderNode.isPresent()) {
+        configManager.getLoadManager().getRouteBalancer().
+                changeLeaderForMultiLeaderConsensus(regionId, 
newLeaderNode.get().getDataNodeId());
+      }
+      LOGGER.info(
+              "{}, Change region leader finished for MULTI_LEADER_CONSENSUS, 
regionId: {}, newLeaderNode: {}",
+              REMOVE_DATANODE_PROCESS,
+              regionId,
+              newLeaderNode);
+
+      return;
+    }
+
     if (newLeaderNode.isPresent()) {
       SyncDataNodeClientPool.getInstance()
           .changeRegionLeader(
               regionId, originalDataNode.getInternalEndPoint(), 
newLeaderNode.get());
       LOGGER.info(
-          "{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
+          "{}, Change region leader finished for RATIS_CONSENSUS, regionId: 
{}, newLeaderNode: {}",
           REMOVE_DATANODE_PROCESS,
           regionId,
           newLeaderNode);
@@ -610,8 +642,8 @@ public class DataNodeRemoveHandler {
    */
   private TSStatus checkClusterProtocol() {
     TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    if 
(CONF.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.SIMPLE_CONSENSUS)
-        || 
CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.SIMPLE_CONSENSUS))
 {
+    if (CONF.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)
+        || 
CONF.getSchemaRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
       status.setCode(TSStatusCode.REMOVE_DATANODE_FAILED.getStatusCode());
       status.setMessage("SimpleConsensus protocol is not supported to remove 
data node");
     }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 27e0d120b0..f15ed67489 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
@@ -82,17 +83,18 @@ public class RegionMigrateProcedure
       return Flow.NO_MORE_STATE;
     }
     TSStatus tsStatus;
+    DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
     try {
       switch (state) {
         case REGION_MIGRATE_PREPARE:
           setNextState(RegionTransitionState.CREATE_NEW_REGION_PEER);
           break;
         case CREATE_NEW_REGION_PEER:
-          env.getDataNodeRemoveHandler().createNewRegionPeer(consensusGroupId, 
destDataNode);
+          handler.createNewRegionPeer(consensusGroupId, destDataNode);
           setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
         case ADD_REGION_PEER:
-          tsStatus = 
env.getDataNodeRemoveHandler().addRegionPeer(destDataNode, consensusGroupId);
+          tsStatus = handler.addRegionPeer(destDataNode, consensusGroupId);
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
@@ -101,13 +103,11 @@ public class RegionMigrateProcedure
           setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
           break;
         case CHANGE_REGION_LEADER:
-          env.getDataNodeRemoveHandler().changeRegionLeader(consensusGroupId, 
originalDataNode);
+          handler.changeRegionLeader(consensusGroupId, originalDataNode, 
destDataNode);
           setNextState(RegionTransitionState.REMOVE_REGION_PEER);
           break;
         case REMOVE_REGION_PEER:
-          tsStatus =
-              env.getDataNodeRemoveHandler()
-                  .removeRegionPeer(originalDataNode, destDataNode, 
consensusGroupId);
+          tsStatus = handler.removeRegionPeer(originalDataNode, destDataNode, 
consensusGroupId);
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId, state);
           } else {
@@ -116,9 +116,7 @@ public class RegionMigrateProcedure
           setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER);
           break;
         case DELETE_OLD_REGION_PEER:
-          tsStatus =
-              env.getDataNodeRemoveHandler()
-                  .deleteOldRegionPeer(originalDataNode, consensusGroupId);
+          tsStatus = handler.deleteOldRegionPeer(originalDataNode, 
consensusGroupId);
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
             waitForOneMigrationStepFinished(consensusGroupId, state);
           }
@@ -127,8 +125,7 @@ public class RegionMigrateProcedure
           setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE);
           break;
         case UPDATE_REGION_LOCATION_CACHE:
-          env.getDataNodeRemoveHandler()
-              .updateRegionLocationCache(consensusGroupId, originalDataNode, 
destDataNode);
+          handler.updateRegionLocationCache(consensusGroupId, 
originalDataNode, destDataNode);
           return Flow.NO_MORE_STATE;
       }
     } catch (Exception e) {

Reply via email to