This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch 9acb3594a1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cc34e9fe5f9d2e79c5fc2995c36649d132538085
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Aug 16 23:43:13 2023 +0800

    Fix Region migrate bug (#10862)
---
 .../partition/IoTDBAutoRegionGroupExtensionIT.java |   1 +
 .../IoTDBAutoRegionGroupExtensionIT2.java          | 160 +++++++++++++++++++++
 .../region/GreedyRegionGroupAllocator.java         |  41 ++++--
 .../load/cache/node/DataNodeHeartbeatCache.java    |   4 +-
 .../load/cache/region/RegionHeartbeatSample.java   |   5 +
 .../partition/DatabasePartitionTable.java          |   4 +-
 .../persistence/partition/RegionGroup.java         |  11 ++
 .../procedure/env/ConfigNodeProcedureEnv.java      |  11 ++
 .../procedure/env/DataNodeRemoveHandler.java       |   7 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |  20 ++-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  10 +-
 12 files changed, 250 insertions(+), 28 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
index 608fdaa68a2..9825d400a8c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
@@ -59,6 +59,7 @@ import static 
org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generateP
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
 public class IoTDBAutoRegionGroupExtensionIT {
+
   private static final String testDataRegionGroupExtensionPolicy = "AUTO";
   private static final String testConsensusProtocolClass = 
ConsensusFactory.RATIS_CONSENSUS;
   private static final int testReplicationFactor = 1;
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java
new file mode 100644
index 00000000000..f091bafd542
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBAutoRegionGroupExtensionIT2 {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IoTDBAutoRegionGroupExtensionIT2.class);
+
+  private static final String testDataRegionGroupExtensionPolicy = "AUTO";
+  private static final String testConsensusProtocolClass = 
ConsensusFactory.IOT_CONSENSUS;
+  private static final int testReplicationFactor = 3;
+
+  private static final String database = "root.db";
+  private static final long testTimePartitionInterval = 604800000;
+  private static final int testMinDataRegionGroupNum = 3;
+  private static final int testDataNodeNum = 3;
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataRegionConsensusProtocolClass(testConsensusProtocolClass)
+        .setDataReplicationFactor(testReplicationFactor)
+        .setDataRegionGroupExtensionPolicy(testDataRegionGroupExtensionPolicy)
+        .setTimePartitionInterval(testTimePartitionInterval);
+    // Init 1C3D environment
+    EnvFactory.getEnv().initClusterEnvironment(1, testDataNodeNum);
+  }
+
+  @After
+  public void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testAutoRegionGroupExtensionPolicy2()
+      throws ClientManagerException, IOException, InterruptedException, 
TException {
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+      TSStatus status =
+          client.setDatabase(
+              new 
TDatabaseSchema(database).setMinDataRegionGroupNum(testMinDataRegionGroupNum));
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+
+      // Shutdown 1 DataNode
+      EnvFactory.getEnv().shutdownDataNode(1);
+      EnvFactory.getEnv()
+          .ensureNodeStatus(
+              
Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(1)),
+              Collections.singletonList(NodeStatus.Unknown));
+
+      // Create 3 DataPartitions to extend 3 DataRegionGroups
+      for (int i = 0; i < testMinDataRegionGroupNum; i++) {
+        Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
partitionSlotsMap =
+            ConfigNodeTestUtils.constructPartitionSlotsMap(
+                database, i, i + 1, i, i + 1, testTimePartitionInterval);
+        TDataPartitionReq dataPartitionReq = new 
TDataPartitionReq(partitionSlotsMap);
+        TDataPartitionTableResp dataPartitionTableResp = null;
+        for (int retry = 0; retry < 5; retry++) {
+          // Build new Client since it's unstable in Win8 environment
+          try (SyncConfigNodeIServiceClient configNodeClient =
+              (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+            dataPartitionTableResp =
+                
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+            if (dataPartitionTableResp != null) {
+              break;
+            }
+          } catch (Exception e) {
+            // Retry sometimes in order to avoid request timeout
+            LOGGER.error(e.getMessage());
+            TimeUnit.SECONDS.sleep(1);
+          }
+        }
+        Assert.assertNotNull(dataPartitionTableResp);
+        Assert.assertEquals(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+            dataPartitionTableResp.getStatus().getCode());
+        Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+        ConfigNodeTestUtils.checkDataPartitionTable(
+            database,
+            i,
+            i + 1,
+            i,
+            i + 1,
+            testTimePartitionInterval,
+            dataPartitionTableResp.getDataPartitionTable());
+      }
+
+      // Restart DataNode
+      EnvFactory.getEnv().startDataNode(1);
+
+      // Check DataRegionGroups
+      TShowRegionResp resp = client.showRegion(new TShowRegionReq());
+      Map<Integer, AtomicInteger> counter = new HashMap<>();
+      resp.getRegionInfoList()
+          .forEach(
+              regionInfo ->
+                  counter
+                      .computeIfAbsent(regionInfo.getDataNodeId(), empty -> 
new AtomicInteger(0))
+                      .incrementAndGet());
+      counter.forEach((dataNodeId, regionCount) -> Assert.assertEquals(3, 
regionCount.get()));
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index 2e06fc26f1a..21b8b80bcbc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -25,6 +25,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.tsfile.utils.Pair;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -37,6 +40,8 @@ import static java.util.Map.Entry.comparingByValue;
 /** Allocate Region Greedily */
 public class GreedyRegionGroupAllocator implements IRegionGroupAllocator {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GreedyRegionGroupAllocator.class);
+
   private static final AtomicInteger ZERO = new AtomicInteger(0);
 
   public GreedyRegionGroupAllocator() {
@@ -62,6 +67,7 @@ public class GreedyRegionGroupAllocator implements 
IRegionGroupAllocator {
       Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
       Map<Integer, Double> freeDiskSpaceMap,
       List<TRegionReplicaSet> allocatedRegionGroups) {
+
     // Map<DataNodeId, Region count>
     Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
     allocatedRegionGroups.forEach(
@@ -87,16 +93,29 @@ public class GreedyRegionGroupAllocator implements 
IRegionGroupAllocator {
                         regionCounter.getOrDefault(dataNodeId, ZERO).get(),
                         freeDiskSpaceMap.getOrDefault(dataNodeId, 0d))));
 
-    return priorityMap.entrySet().stream()
-        .sorted(
-            comparingByValue(
-                (o1, o2) ->
-                    !Objects.equals(o1.getLeft(), o2.getLeft())
-                        // Compare the first key(The number of Regions) by 
ascending order
-                        ? o1.getLeft() - o2.getLeft()
-                        // Compare the second key(The free disk space) by 
descending order
-                        : (int) (o2.getRight() - o1.getRight())))
-        .map(entry -> entry.getKey().deepCopy())
-        .collect(Collectors.toList());
+    // Sort weightList
+    List<TDataNodeLocation> result =
+        priorityMap.entrySet().stream()
+            .sorted(
+                comparingByValue(
+                    (o1, o2) ->
+                        !Objects.equals(o1.getLeft(), o2.getLeft())
+                            // Compare the first key(The number of Regions) by 
ascending order
+                            ? o1.getLeft() - o2.getLeft()
+                            // Compare the second key(The free disk space) by 
descending order
+                            : (int) (o2.getRight() - o1.getRight())))
+            .map(entry -> entry.getKey().deepCopy())
+            .collect(Collectors.toList());
+
+    // Record weightList
+    for (TDataNodeLocation dataNodeLocation : result) {
+      LOGGER.info(
+          "[RegionGroupWeightList] DataNodeId: {}, RegionCount: {}, 
FreeDiskSpace: {}",
+          dataNodeLocation.getDataNodeId(),
+          priorityMap.get(dataNodeLocation).getLeft(),
+          priorityMap.get(dataNodeLocation).getRight());
+    }
+
+    return result;
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index d3f3dc35c98..7e47edb4fd2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -54,7 +54,9 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
     NodeStatus status = null;
     String statusReason = null;
     // TODO: Optimize judge logic
-    if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) {
+    if (lastSample != null && 
NodeStatus.Removing.equals(lastSample.getStatus())) {
+      status = NodeStatus.Removing;
+    } else if (System.currentTimeMillis() - lastSendTime > 
HEARTBEAT_TIMEOUT_TIME) {
       status = NodeStatus.Unknown;
     } else if (lastSample != null) {
       status = lastSample.getStatus();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
index e55497bc5f2..67c221fa706 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
@@ -47,4 +47,9 @@ public class RegionHeartbeatSample {
   public RegionStatus getStatus() {
     return status;
   }
+
+  public static RegionHeartbeatSample generateDefaultSample(RegionStatus 
status) {
+    long currentTime = System.currentTimeMillis();
+    return new RegionHeartbeatSample(currentTime, currentTime, status);
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index a629569470a..bdca8aa7959 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -504,7 +504,7 @@ public class DatabasePartitionTable {
           regionId);
       return;
     }
-    regionGroup.getReplicaSet().getDataNodeLocations().add(node);
+    regionGroup.addRegionLocation(node);
   }
 
   private void removeRegionOldLocation(TConsensusGroupId regionId, 
TDataNodeLocation node) {
@@ -525,7 +525,7 @@ public class DatabasePartitionTable {
           regionId);
       return;
     }
-    regionGroup.getReplicaSet().getDataNodeLocations().remove(node);
+    regionGroup.removeRegionLocation(node);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index a555c2349cd..81fea013c21 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.confignode.persistence.partition;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -74,6 +75,16 @@ public class RegionGroup {
     return replicaSet.deepCopy();
   }
 
+  public void addRegionLocation(TDataNodeLocation node) {
+    replicaSet.addToDataNodeLocations(node);
+    replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);
+  }
+
+  public void removeRegionLocation(TDataNodeLocation node) {
+    replicaSet.getDataNodeLocations().remove(node);
+    replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);
+  }
+
   /** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot 
Count> */
   public void updateSlotCountMap(Map<TSeriesPartitionSlot, AtomicLong> 
deltaMap) {
     deltaMap.forEach(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 371243bc197..ee5fcccd681 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -401,6 +401,17 @@ public class ConfigNodeProcedureEnv {
             NodeType.DataNode,
             dataNodeLocation.getDataNodeId(),
             NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
+    // Force update RegionStatus to Removing
+    getPartitionManager()
+        .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+        .forEach(
+            replicaSet ->
+                getLoadManager()
+                    .forceUpdateRegionGroupCache(
+                        replicaSet.getRegionId(),
+                        Collections.singletonMap(
+                            dataNodeLocation.getDataNodeId(),
+                            
RegionHeartbeatSample.generateDefaultSample(RegionStatus.Removing))));
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index d4cde051a84..e7cf43a2c63 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -58,6 +58,7 @@ import static 
org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
 import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
 
 public class DataNodeRemoveHandler {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeRemoveHandler.class);
 
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
@@ -402,7 +403,11 @@ public class DataNodeRemoveHandler {
 
   private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
       List<TDataNodeLocation> regionReplicaNodes) {
-    return 
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+    List<TDataNodeConfiguration> dataNodeConfigurations =
+        
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
+    // Randomly selected to ensure a basic load balancing
+    Collections.shuffle(dataNodeConfigurations);
+    return dataNodeConfigurations.stream()
         .map(TDataNodeConfiguration::getLocation)
         .filter(e -> !regionReplicaNodes.contains(e))
         .findAny();
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 27c8bb0d271..055db6c0bb1 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.NodeReadOnlyException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
@@ -73,6 +74,8 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
@@ -392,19 +395,18 @@ class RatisConsensus implements IConsensus {
   @Override
   public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, 
List<Peer> peers) {
     RaftGroup group = buildRaftGroup(groupId, peers);
-    // add RaftPeer myself to this RaftGroup
-    return addNewGroupToServer(group, myself);
-  }
-
-  private ConsensusGenericResponse addNewGroupToServer(RaftGroup group, 
RaftPeer server) {
     RaftClientReply reply;
     RaftGroup clientGroup =
-        group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), 
server) : group;
+        group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), 
myself) : group;
     try (RatisClient client = getRaftClient(clientGroup)) {
-      reply = 
client.getRaftClient().getGroupManagementApi(server.getId()).add(group);
+      reply = 
client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
       if (!reply.isSuccess()) {
         return failed(new RatisRequestFailedException(reply.getException()));
       }
+    } catch (AlreadyExistsException e) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupAlreadyExistException(groupId))
+          .build();
     } catch (Exception e) {
       return failed(new RatisRequestFailedException(e));
     }
@@ -437,6 +439,10 @@ class RatisConsensus implements IConsensus {
       if (!reply.isSuccess()) {
         return failed(new RatisRequestFailedException(reply.getException()));
       }
+    } catch (GroupMismatchException e) {
+      return ConsensusGenericResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
     } catch (IOException e) {
       return failed(new RatisRequestFailedException(e));
     }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 245474c10e5..6d6b1742603 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -29,7 +29,7 @@ import 
org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.RatisConfig;
-import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 
 import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
@@ -117,7 +117,7 @@ public class RatisConsensusTest {
 
     ConsensusGenericResponse resp = 
servers.get(0).createPeer(group.getGroupId(), original);
     Assert.assertFalse(resp.isSuccess());
-    Assert.assertTrue(resp.getException() instanceof 
RatisRequestFailedException);
+    Assert.assertTrue(resp.getException() instanceof 
ConsensusGroupAlreadyExistException);
 
     // add 2 members
     servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 33c6cfcee4d..ddddd2f783d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -51,7 +51,8 @@ import org.apache.iotdb.commons.udf.UDFInformation;
 import org.apache.iotdb.commons.udf.service.UDFManagementService;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -1417,7 +1418,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       ConsensusGenericResponse response =
           DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
       if (!response.isSuccess()
-          && !(response.getException() instanceof 
PeerNotInConsensusGroupException)) {
+          && !(response.getException() instanceof 
ConsensusGroupNotExistException)) {
         return RpcUtils.getStatus(
             TSStatusCode.DELETE_REGION_ERROR, 
response.getException().getMessage());
       }
@@ -1426,7 +1427,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       ConsensusGenericResponse response =
           SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
       if (!response.isSuccess()
-          && !(response.getException() instanceof 
PeerNotInConsensusGroupException)) {
+          && !(response.getException() instanceof 
ConsensusGroupNotExistException)) {
         return RpcUtils.getStatus(
             TSStatusCode.DELETE_REGION_ERROR, 
response.getException().getMessage());
       }
@@ -1759,7 +1760,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     } else {
       resp = SchemaRegionConsensusImpl.getInstance().createPeer(regionId, 
peers);
     }
-    if (!resp.isSuccess()) {
+    if (!resp.isSuccess()
+        && !(resp.getException() instanceof 
ConsensusGroupAlreadyExistException)) {
       LOGGER.warn(
           "{}, CreateNewRegionPeer error, peers: {}, regionId: {}, 
errorMessage",
           REGION_MIGRATE_PROCESS,

Reply via email to