This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch 9acb3594a1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cc34e9fe5f9d2e79c5fc2995c36649d132538085 Author: YongzaoDan <[email protected]> AuthorDate: Wed Aug 16 23:43:13 2023 +0800 Fix Region migrate bug (#10862) --- .../partition/IoTDBAutoRegionGroupExtensionIT.java | 1 + .../IoTDBAutoRegionGroupExtensionIT2.java | 160 +++++++++++++++++++++ .../region/GreedyRegionGroupAllocator.java | 41 ++++-- .../load/cache/node/DataNodeHeartbeatCache.java | 4 +- .../load/cache/region/RegionHeartbeatSample.java | 5 + .../partition/DatabasePartitionTable.java | 4 +- .../persistence/partition/RegionGroup.java | 11 ++ .../procedure/env/ConfigNodeProcedureEnv.java | 11 ++ .../procedure/env/DataNodeRemoveHandler.java | 7 +- .../iotdb/consensus/ratis/RatisConsensus.java | 20 ++- .../iotdb/consensus/ratis/RatisConsensusTest.java | 4 +- .../impl/DataNodeInternalRPCServiceImpl.java | 10 +- 12 files changed, 250 insertions(+), 28 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java index 608fdaa68a2..9825d400a8c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java @@ -59,6 +59,7 @@ import static org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils.generateP @RunWith(IoTDBTestRunner.class) @Category({ClusterIT.class}) public class IoTDBAutoRegionGroupExtensionIT { + private static final String testDataRegionGroupExtensionPolicy = "AUTO"; private static final String testConsensusProtocolClass = ConsensusFactory.RATIS_CONSENSUS; private static final int testReplicationFactor = 1; diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java new file mode 100644 index 00000000000..f091bafd542 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT2.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.partition; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBAutoRegionGroupExtensionIT2 { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBAutoRegionGroupExtensionIT2.class); + + private static final String testDataRegionGroupExtensionPolicy = "AUTO"; + private static final String testConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS; + private static final int testReplicationFactor = 3; + + private static final String database = "root.db"; + private static final long testTimePartitionInterval = 604800000; + private static final int testMinDataRegionGroupNum = 3; + private static final int testDataNodeNum = 3; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(testConsensusProtocolClass) + .setDataReplicationFactor(testReplicationFactor) + .setDataRegionGroupExtensionPolicy(testDataRegionGroupExtensionPolicy) + .setTimePartitionInterval(testTimePartitionInterval); + // Init 1C3D environment + EnvFactory.getEnv().initClusterEnvironment(1, testDataNodeNum); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testAutoRegionGroupExtensionPolicy2() + throws ClientManagerException, IOException, InterruptedException, TException { + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + TSStatus status = + client.setDatabase( + new TDatabaseSchema(database).setMinDataRegionGroupNum(testMinDataRegionGroupNum)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // Shutdown 1 DataNode + EnvFactory.getEnv().shutdownDataNode(1); + EnvFactory.getEnv() + .ensureNodeStatus( + Collections.singletonList(EnvFactory.getEnv().getDataNodeWrapper(1)), + Collections.singletonList(NodeStatus.Unknown)); + + // Create 3 DataPartitions to extend 3 DataRegionGroups + for (int i = 0; i < testMinDataRegionGroupNum; i++) { + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + database, i, i + 1, i, i + 1, testTimePartitionInterval); + TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); + TDataPartitionTableResp dataPartitionTableResp = null; + for (int retry = 0; retry < 5; retry++) { + // Build new Client since it's unstable in Win8 environment + try (SyncConfigNodeIServiceClient configNodeClient = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + dataPartitionTableResp = + configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq); + if (dataPartitionTableResp != null) { + break; + } + } catch (Exception e) { + // Retry sometimes in order to avoid request timeout + LOGGER.error(e.getMessage()); + TimeUnit.SECONDS.sleep(1); + } + } + Assert.assertNotNull(dataPartitionTableResp); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); + ConfigNodeTestUtils.checkDataPartitionTable( + database, + i, + i + 1, + i, + i + 1, + testTimePartitionInterval, + dataPartitionTableResp.getDataPartitionTable()); + } + + // Restart DataNode + EnvFactory.getEnv().startDataNode(1); + + // Check DataRegionGroups + TShowRegionResp resp = client.showRegion(new TShowRegionReq()); + Map<Integer, AtomicInteger> counter = new HashMap<>(); + resp.getRegionInfoList() + .forEach( + regionInfo -> + counter + .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0)) + .incrementAndGet()); + counter.forEach((dataNodeId, regionCount) -> Assert.assertEquals(3, regionCount.get())); + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java index 2e06fc26f1a..21b8b80bcbc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java @@ -25,6 +25,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; import java.util.Map; import java.util.Objects; @@ -37,6 +40,8 @@ import static java.util.Map.Entry.comparingByValue; /** Allocate Region Greedily */ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { + private static final Logger LOGGER = LoggerFactory.getLogger(GreedyRegionGroupAllocator.class); + private static final AtomicInteger ZERO = new AtomicInteger(0); public GreedyRegionGroupAllocator() { @@ -62,6 +67,7 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { Map<Integer, TDataNodeConfiguration> availableDataNodeMap, Map<Integer, Double> freeDiskSpaceMap, List<TRegionReplicaSet> allocatedRegionGroups) { + // Map<DataNodeId, Region count> Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>(); allocatedRegionGroups.forEach( @@ -87,16 +93,29 @@ public class GreedyRegionGroupAllocator implements IRegionGroupAllocator { regionCounter.getOrDefault(dataNodeId, ZERO).get(), freeDiskSpaceMap.getOrDefault(dataNodeId, 0d)))); - return priorityMap.entrySet().stream() - .sorted( - comparingByValue( - (o1, o2) -> - !Objects.equals(o1.getLeft(), o2.getLeft()) - // Compare the first key(The number of Regions) by ascending order - ? o1.getLeft() - o2.getLeft() - // Compare the second key(The free disk space) by descending order - : (int) (o2.getRight() - o1.getRight()))) - .map(entry -> entry.getKey().deepCopy()) - .collect(Collectors.toList()); + // Sort weightList + List<TDataNodeLocation> result = + priorityMap.entrySet().stream() + .sorted( + comparingByValue( + (o1, o2) -> + !Objects.equals(o1.getLeft(), o2.getLeft()) + // Compare the first key(The number of Regions) by ascending order + ? o1.getLeft() - o2.getLeft() + // Compare the second key(The free disk space) by descending order + : (int) (o2.getRight() - o1.getRight()))) + .map(entry -> entry.getKey().deepCopy()) + .collect(Collectors.toList()); + + // Record weightList + for (TDataNodeLocation dataNodeLocation : result) { + LOGGER.info( + "[RegionGroupWeightList] DataNodeId: {}, RegionCount: {}, FreeDiskSpace: {}", + dataNodeLocation.getDataNodeId(), + priorityMap.get(dataNodeLocation).getLeft(), + priorityMap.get(dataNodeLocation).getRight()); + } + + return result; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java index d3f3dc35c98..7e47edb4fd2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java @@ -54,7 +54,9 @@ public class DataNodeHeartbeatCache extends BaseNodeCache { NodeStatus status = null; String statusReason = null; // TODO: Optimize judge logic - if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) { + if (lastSample != null && NodeStatus.Removing.equals(lastSample.getStatus())) { + status = NodeStatus.Removing; + } else if (System.currentTimeMillis() - lastSendTime > HEARTBEAT_TIMEOUT_TIME) { status = NodeStatus.Unknown; } else if (lastSample != null) { status = lastSample.getStatus(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java index e55497bc5f2..67c221fa706 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionHeartbeatSample.java @@ -47,4 +47,9 @@ public class RegionHeartbeatSample { public RegionStatus getStatus() { return status; } + + public static RegionHeartbeatSample generateDefaultSample(RegionStatus status) { + long currentTime = System.currentTimeMillis(); + return new RegionHeartbeatSample(currentTime, currentTime, status); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index a629569470a..bdca8aa7959 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -504,7 +504,7 @@ public class DatabasePartitionTable { regionId); return; } - regionGroup.getReplicaSet().getDataNodeLocations().add(node); + regionGroup.addRegionLocation(node); } private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) { @@ -525,7 +525,7 @@ public class DatabasePartitionTable { regionId); return; } - regionGroup.getReplicaSet().getDataNodeLocations().remove(node); + regionGroup.removeRegionLocation(node); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java index a555c2349cd..81fea013c21 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/RegionGroup.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.persistence.partition; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -74,6 +75,16 @@ public class RegionGroup { return replicaSet.deepCopy(); } + public void addRegionLocation(TDataNodeLocation node) { + replicaSet.addToDataNodeLocations(node); + replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo); + } + + public void removeRegionLocation(TDataNodeLocation node) { + replicaSet.getDataNodeLocations().remove(node); + replicaSet.getDataNodeLocations().sort(TDataNodeLocation::compareTo); + } + /** @param deltaMap Map<TSeriesPartitionSlot, Delta TTimePartitionSlot Count> */ public void updateSlotCountMap(Map<TSeriesPartitionSlot, AtomicLong> deltaMap) { deltaMap.forEach( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 371243bc197..ee5fcccd681 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -401,6 +401,17 @@ public class ConfigNodeProcedureEnv { NodeType.DataNode, dataNodeLocation.getDataNodeId(), NodeHeartbeatSample.generateDefaultSample(NodeStatus.Removing)); + // Force update RegionStatus to Removing + getPartitionManager() + .getAllReplicaSets(dataNodeLocation.getDataNodeId()) + .forEach( + replicaSet -> + getLoadManager() + .forceUpdateRegionGroupCache( + replicaSet.getRegionId(), + Collections.singletonMap( + dataNodeLocation.getDataNodeId(), + RegionHeartbeatSample.generateDefaultSample(RegionStatus.Removing)))); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index d4cde051a84..e7cf43a2c63 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -58,6 +58,7 @@ import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; public class DataNodeRemoveHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class); private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -402,7 +403,11 @@ public class DataNodeRemoveHandler { private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion( List<TDataNodeLocation> regionReplicaNodes) { - return configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream() + List<TDataNodeConfiguration> dataNodeConfigurations = + configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running); + // Randomly selected to ensure a basic load balancing + Collections.shuffle(dataNodeConfigurations); + return dataNodeConfigurations.stream() .map(TDataNodeConfiguration::getLocation) .filter(e -> !regionReplicaNodes.contains(e)) .findAny(); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 27c8bb0d271..055db6c0bb1 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -45,6 +45,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.RatisConfig; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.NodeReadOnlyException; import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; @@ -73,6 +74,8 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.SnapshotManagementRequest; +import org.apache.ratis.protocol.exceptions.AlreadyExistsException; +import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.protocol.exceptions.RaftException; import org.apache.ratis.protocol.exceptions.ResourceUnavailableException; @@ -392,19 +395,18 @@ class RatisConsensus implements IConsensus { @Override public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> peers) { RaftGroup group = buildRaftGroup(groupId, peers); - // add RaftPeer myself to this RaftGroup - return addNewGroupToServer(group, myself); - } - - private ConsensusGenericResponse addNewGroupToServer(RaftGroup group, RaftPeer server) { RaftClientReply reply; RaftGroup clientGroup = - group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), server) : group; + group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), myself) : group; try (RatisClient client = getRaftClient(clientGroup)) { - reply = client.getRaftClient().getGroupManagementApi(server.getId()).add(group); + reply = client.getRaftClient().getGroupManagementApi(myself.getId()).add(group); if (!reply.isSuccess()) { return failed(new RatisRequestFailedException(reply.getException())); } + } catch (AlreadyExistsException e) { + return ConsensusGenericResponse.newBuilder() + .setException(new ConsensusGroupAlreadyExistException(groupId)) + .build(); } catch (Exception e) { return failed(new RatisRequestFailedException(e)); } @@ -437,6 +439,10 @@ class RatisConsensus implements IConsensus { if (!reply.isSuccess()) { return failed(new RatisRequestFailedException(reply.getException())); } + } catch (GroupMismatchException e) { + return ConsensusGenericResponse.newBuilder() + .setException(new ConsensusGroupNotExistException(groupId)) + .build(); } catch (IOException e) { return failed(new RatisRequestFailedException(e)); } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index 245474c10e5..6d6b1742603 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -29,7 +29,7 @@ import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.RatisConfig; -import org.apache.iotdb.consensus.exception.RatisRequestFailedException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.ratis.util.TimeDuration; import org.junit.After; @@ -117,7 +117,7 @@ public class RatisConsensusTest { ConsensusGenericResponse resp = servers.get(0).createPeer(group.getGroupId(), original); Assert.assertFalse(resp.isSuccess()); - Assert.assertTrue(resp.getException() instanceof RatisRequestFailedException); + Assert.assertTrue(resp.getException() instanceof ConsensusGroupAlreadyExistException); // add 2 members servers.get(1).createPeer(group.getGroupId(), Collections.emptyList()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 33c6cfcee4d..ddddd2f783d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -51,7 +51,8 @@ import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.service.UDFManagementService; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; +import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.db.auth.AuthorizerManager; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; @@ -1417,7 +1418,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface ConsensusGenericResponse response = DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId); if (!response.isSuccess() - && !(response.getException() instanceof PeerNotInConsensusGroupException)) { + && !(response.getException() instanceof ConsensusGroupNotExistException)) { return RpcUtils.getStatus( TSStatusCode.DELETE_REGION_ERROR, response.getException().getMessage()); } @@ -1426,7 +1427,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface ConsensusGenericResponse response = SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId); if (!response.isSuccess() - && !(response.getException() instanceof PeerNotInConsensusGroupException)) { + && !(response.getException() instanceof ConsensusGroupNotExistException)) { return RpcUtils.getStatus( TSStatusCode.DELETE_REGION_ERROR, response.getException().getMessage()); } @@ -1759,7 +1760,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface } else { resp = SchemaRegionConsensusImpl.getInstance().createPeer(regionId, peers); } - if (!resp.isSuccess()) { + if (!resp.isSuccess() + && !(resp.getException() instanceof ConsensusGroupAlreadyExistException)) { LOGGER.warn( "{}, CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage", REGION_MIGRATE_PROCESS,
