This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new f4750141851 [To rel/1.2] Fix Region migrate bug (#10862) (#10880)
f4750141851 is described below
commit f4750141851431987c0109af12a67d1ce3ad8932
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Aug 17 10:13:38 2023 +0800
[To rel/1.2] Fix Region migrate bug (#10862) (#10880)
---
.../partition/IoTDBAutoRegionGroupExtensionIT.java | 1 +
.../IoTDBAutoRegionGroupExtensionIT2.java | 160 +++++++++++++++++++++
.../region/GreedyRegionGroupAllocator.java | 41 ++++--
.../load/cache/node/DataNodeHeartbeatCache.java | 4 +-
.../load/cache/region/RegionHeartbeatSample.java | 5 +
.../partition/DatabasePartitionTable.java | 4 +-
.../persistence/partition/RegionGroup.java | 11 ++
.../procedure/env/ConfigNodeProcedureEnv.java | 11 ++
.../procedure/env/DataNodeRemoveHandler.java | 7 +-
.../iotdb/consensus/ratis/RatisConsensus.java | 20 ++-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 4 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 10 +-
12 files changed, 250 insertions(+), 28 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
index 608fdaa68a2..9825d400a8c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
@@ -59,6 +59,7 @@ import static
org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generateP
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBAutoRegionGroupExtensionIT {
+
private static final String testDataRegionGroupExtensionPolicy = "AUTO";
private static final String testConsensusProtocolClass =
ConsensusFactory.RATIS_CONSENSUS;
private static final int testReplicationFactor = 1;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java
new file mode 100644
index 00000000000..f091bafd542
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
+import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBAutoRegionGroupExtensionIT2 {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(IoTDBAutoRegionGroupExtensionIT2.class);
+
+ private static final String testDataRegionGroupExtensionPolicy = "AUTO";
+ private static final String testConsensusProtocolClass =
ConsensusFactory.IOT_CONSENSUS;
+ private static final int testReplicationFactor = 3;
+
+ private static final String database = "root.db";
+ private static final long testTimePartitionInterval = 604800000;
+ private static final int testMinDataRegionGroupNum = 3;
+ private static final int testDataNodeNum = 3;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataRegionConsensusProtocolClass(testConsensusProtocolClass)
+ .setDataReplicationFactor(testReplicationFactor)
+ .setDataRegionGroupExtensionPolicy(testDataRegionGroupExtensionPolicy)
+ .setTimePartitionInterval(testTimePartitionInterval);
+ // Init 1C3D environment
+ EnvFactory.getEnv().initClusterEnvironment(1, testDataNodeNum);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testAutoRegionGroupExtensionPolicy2()
+ throws ClientManagerException, IOException, InterruptedException,
TException {
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ TSStatus status =
+ client.setDatabase(
+ new
TDatabaseSchema(database).setMinDataRegionGroupNum(testMinDataRegionGroupNum));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ // Shutdown 1 DataNode
+ EnvFactory.getEnv().shutdownDataNode(1);
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+
Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(1)),
+ Collections.singletonList(NodeStatus.Unknown));
+
+ // Create 3 DataPartitions to extend 3 DataRegionGroups
+ for (int i = 0; i < testMinDataRegionGroupNum; i++) {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
partitionSlotsMap =
+ ConfigNodeTestUtils.constructPartitionSlotsMap(
+ database, i, i + 1, i, i + 1, testTimePartitionInterval);
+ TDataPartitionReq dataPartitionReq = new
TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp = null;
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ dataPartitionTableResp =
+
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ ConfigNodeTestUtils.checkDataPartitionTable(
+ database,
+ i,
+ i + 1,
+ i,
+ i + 1,
+ testTimePartitionInterval,
+ dataPartitionTableResp.getDataPartitionTable());
+ }
+
+ // Restart DataNode
+ EnvFactory.getEnv().startDataNode(1);
+
+ // Check DataRegionGroups
+ TShowRegionResp resp = client.showRegion(new TShowRegionReq());
+ Map<Integer, AtomicInteger> counter = new HashMap<>();
+ resp.getRegionInfoList()
+ .forEach(
+ regionInfo ->
+ counter
+ .computeIfAbsent(regionInfo.getDataNodeId(), empty ->
new AtomicInteger(0))
+ .incrementAndGet());
+ counter.forEach((dataNodeId, regionCount) -> Assert.assertEquals(3,
regionCount.get()));
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
index 2e06fc26f1a..21b8b80bcbc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java
@@ -25,6 +25,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -37,6 +40,8 @@ import static java.util.Map.Entry.comparingByValue;
/** Allocate Region Greedily */
public class GreedyRegionGroupAllocator implements IRegionGroupAllocator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GreedyRegionGroupAllocator.class);
+
private static final AtomicInteger ZERO = new AtomicInteger(0);
public GreedyRegionGroupAllocator() {
@@ -62,6 +67,7 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
Map<Integer, TDataNodeConfiguration> availableDataNodeMap,
Map<Integer, Double> freeDiskSpaceMap,
List<TRegionReplicaSet> allocatedRegionGroups) {
+
// Map<DataNodeId, Region count>
Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>();
allocatedRegionGroups.forEach(
@@ -87,16 +93,29 @@ public class GreedyRegionGroupAllocator implements
IRegionGroupAllocator {
regionCounter.getOrDefault(dataNodeId, ZERO).get(),
freeDiskSpaceMap.getOrDefault(dataNodeId, 0d))));
- return priorityMap.entrySet().stream()
- .sorted(
- comparingByValue(
- (o1, o2) ->
- !Objects.equals(o1.getLeft(), o2.getLeft())
- // Compare the first key(The number of Regions) by
ascending order
- ? o1.getLeft() - o2.getLeft()
- // Compare the second key(The free disk space) by
descending order
- : (int) (o2.getRight() - o1.getRight())))
- .map(entry -> entry.getKey().deepCopy())
- .collect(Collectors.toList());
+ // Sort weightList
+ List<TDataNodeLocation> result =
+ priorityMap.entrySet().stream()
+ .sorted(
+ comparingByValue(
+ (o1, o2) ->
+ !Objects.equals(o1.getLeft(), o2.getLeft())
+ // Compare the first key(The number of Regions) by
ascending order
+ ? o1.getLeft() - o2.getLeft()
+ // Compare the second key(The free disk space) by
descending order
+ : (int) (o2.getRight() - o1.getRight())))
+ .map(entry -> entry.getKey().deepCopy())
+ .collect(Collectors.toList());
+
+ // Record weightList
+ for (TDataNodeLocation dataNodeLocation : result) {
+ LOGGER.info(
+ "[RegionGroupWeightList] DataNodeId: {}, RegionCount: {},
FreeDiskSpace: {}",
+ dataNodeLocation.getDataNodeId(),
+ priorityMap.get(dataNodeLocation).getLeft(),
+ priorityMap.get(dataNodeLocation).getRight());
+ }
+
+ return result;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index d3f3dc35c98..7e47edb4fd2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -54,7 +54,9 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
NodeStatus status = null;
String statusReason = null;
// TODO: Optimize judge logic
- if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) {
+ if (lastSample != null &&
NodeStatus.Removing.equals(lastSample.getStatus())) {
+ status = NodeStatus.Removing;
+ } else if (System.currentTimeMillis() - lastSendTime >
HEARTBEAT_TIMEOUT_TIME) {
status = NodeStatus.Unknown;
} else if (lastSample != null) {
status = lastSample.getStatus();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
index e55497bc5f2..67c221fa706 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java
@@ -47,4 +47,9 @@ public class RegionHeartbeatSample {
public RegionStatus getStatus() {
return status;
}
+
+ public static RegionHeartbeatSample generateDefaultSample(RegionStatus
status) {
+ long currentTime = System.currentTimeMillis();
+ return new RegionHeartbeatSample(currentTime, currentTime, status);
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index a629569470a..bdca8aa7959 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -504,7 +504,7 @@ public class DatabasePartitionTable {
regionId);
return;
}
- regionGroup.getReplicaSet().getDataNodeLocations().add(node);
+ regionGroup.addRegionLocation(node);
}
private void removeRegionOldLocation(TConsensusGroupId regionId,
TDataNodeLocation node) {
@@ -525,7 +525,7 @@ public class DatabasePartitionTable {
regionId);
return;
}
- regionGroup.getReplicaSet().getDataNodeLocations().remove(node);
+ regionGroup.removeRegionLocation(node);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
index a555c2349cd..81fea013c21 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.persistence.partition;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -74,6 +75,16 @@ public class RegionGroup {
return replicaSet.deepCopy();
}
+ public void addRegionLocation(TDataNodeLocation node) {
+ replicaSet.addToDataNodeLocations(node);
+ replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);
+ }
+
+ public void removeRegionLocation(TDataNodeLocation node) {
+ replicaSet.getDataNodeLocations().remove(node);
+ replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo);
+ }
+
/** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot
Count> */
public void updateSlotCountMap(Map<TSeriesPartitionSlot, AtomicLong>
deltaMap) {
deltaMap.forEach(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 371243bc197..ee5fcccd681 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -401,6 +401,17 @@ public class ConfigNodeProcedureEnv {
NodeType.DataNode,
dataNodeLocation.getDataNodeId(),
NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing));
+ // Force update RegionStatus to Removing
+ getPartitionManager()
+ .getAllReplicaSets(dataNodeLocation.getDataNodeId())
+ .forEach(
+ replicaSet ->
+ getLoadManager()
+ .forceUpdateRegionGroupCache(
+ replicaSet.getRegionId(),
+ Collections.singletonMap(
+ dataNodeLocation.getDataNodeId(),
+
RegionHeartbeatSample.generateDefaultSample(RegionStatus.Removing))));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index d4cde051a84..e7cf43a2c63 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -58,6 +58,7 @@ import static
org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
public class DataNodeRemoveHandler {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeRemoveHandler.class);
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
@@ -402,7 +403,11 @@ public class DataNodeRemoveHandler {
private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
List<TDataNodeLocation> regionReplicaNodes) {
- return
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+ List<TDataNodeConfiguration> dataNodeConfigurations =
+
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
+ // Randomly selected to ensure a basic load balancing
+ Collections.shuffle(dataNodeConfigurations);
+ return dataNodeConfigurations.stream()
.map(TDataNodeConfiguration::getLocation)
.filter(e -> !regionReplicaNodes.contains(e))
.findAny();
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index f6819b18097..d73964c7002 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.NodeReadOnlyException;
import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
@@ -73,6 +74,8 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
@@ -387,19 +390,18 @@ class RatisConsensus implements IConsensus {
@Override
public ConsensusGenericResponse createPeer(ConsensusGroupId groupId,
List<Peer> peers) {
RaftGroup group = buildRaftGroup(groupId, peers);
- // add RaftPeer myself to this RaftGroup
- return addNewGroupToServer(group, myself);
- }
-
- private ConsensusGenericResponse addNewGroupToServer(RaftGroup group,
RaftPeer server) {
RaftClientReply reply;
RaftGroup clientGroup =
- group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(),
server) : group;
+ group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(),
myself) : group;
try (RatisClient client = getRaftClient(clientGroup)) {
- reply =
client.getRaftClient().getGroupManagementApi(server.getId()).add(group);
+ reply =
client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
if (!reply.isSuccess()) {
return failed(new RatisRequestFailedException(reply.getException()));
}
+ } catch (AlreadyExistsException e) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupAlreadyExistException(groupId))
+ .build();
} catch (Exception e) {
return failed(new RatisRequestFailedException(e));
}
@@ -432,6 +434,10 @@ class RatisConsensus implements IConsensus {
if (!reply.isSuccess()) {
return failed(new RatisRequestFailedException(reply.getException()));
}
+ } catch (GroupMismatchException e) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
} catch (IOException e) {
return failed(new RatisRequestFailedException(e));
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 245474c10e5..6d6b1742603 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -29,7 +29,7 @@ import
org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.RatisConfig;
-import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.ratis.util.TimeDuration;
import org.junit.After;
@@ -117,7 +117,7 @@ public class RatisConsensusTest {
ConsensusGenericResponse resp =
servers.get(0).createPeer(group.getGroupId(), original);
Assert.assertFalse(resp.isSuccess());
- Assert.assertTrue(resp.getException() instanceof
RatisRequestFailedException);
+ Assert.assertTrue(resp.getException() instanceof
ConsensusGroupAlreadyExistException);
// add 2 members
servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 33c6cfcee4d..ddddd2f783d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -51,7 +51,8 @@ import org.apache.iotdb.commons.udf.UDFInformation;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -1417,7 +1418,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
ConsensusGenericResponse response =
DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
if (!response.isSuccess()
- && !(response.getException() instanceof
PeerNotInConsensusGroupException)) {
+ && !(response.getException() instanceof
ConsensusGroupNotExistException)) {
return RpcUtils.getStatus(
TSStatusCode.DELETE_REGION_ERROR,
response.getException().getMessage());
}
@@ -1426,7 +1427,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
ConsensusGenericResponse response =
SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
if (!response.isSuccess()
- && !(response.getException() instanceof
PeerNotInConsensusGroupException)) {
+ && !(response.getException() instanceof
ConsensusGroupNotExistException)) {
return RpcUtils.getStatus(
TSStatusCode.DELETE_REGION_ERROR,
response.getException().getMessage());
}
@@ -1759,7 +1760,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
} else {
resp = SchemaRegionConsensusImpl.getInstance().createPeer(regionId,
peers);
}
- if (!resp.isSuccess()) {
+ if (!resp.isSuccess()
+ && !(resp.getException() instanceof
ConsensusGroupAlreadyExistException)) {
LOGGER.warn(
"{}, CreateNewRegionPeer error, peers: {}, regionId: {},
errorMessage",
REGION_MIGRATE_PROCESS,