This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch region_migration in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15f80efaefbd5a8345a0bfaa721ad7fbdb95a8d7 Author: liyuheng <[email protected]> AuthorDate: Fri Jan 5 10:55:14 2024 +0800 init IT frame, and select coordinator deterministically. include below commits: IT pass: select coordinator deterministically confignode RegionMigrateProcedure stage finished breakpoint select coordinator deterministically IT improve adding stopForcibly IT file function improve confignode RegionMigrateProcedure stage finished breakpoint add function IT basic frame done save --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 20 ++ .../it/env/cluster/node/AbstractNodeWrapper.java | 16 +- .../iotdb/it/env/remote/env/RemoteServerEnv.java | 6 + .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 3 + .../apache/iotdb/itbase/env/BaseNodeWrapper.java | 2 + .../it/IoTDBRegionMigrateReliabilityIT.java | 313 +++++++++++++++++++++ .../iotdb/confignode/manager/ProcedureManager.java | 53 +++- .../procedure/env/ConfigNodeProcedureEnv.java | 19 +- .../procedure/env/DataNodeRemoveHandler.java | 45 +-- .../impl/node/RemoveDataNodeProcedure.java | 14 +- .../impl/statemachine/RegionMigrateProcedure.java | 23 +- .../statemachine/RegionMigrateProcedureTest.java | 16 +- .../apache/iotdb/consensus/iot/IoTConsensus.java | 1 + .../org/apache/iotdb/commons/utils/FileUtils.java | 10 + 14 files changed, 483 insertions(+), 58 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 75e7cfb2572..257d813982c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -28,7 +28,9 @@ import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.isession.ISession; import org.apache.iotdb.isession.SessionConfig; import org.apache.iotdb.isession.pool.ISessionPool; @@ -930,4 +932,22 @@ public abstract class AbstractEnv implements BaseEnv { public String getLibPath() { return TEMPLATE_NODE_LIB_PATH; } + + @Override + public Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId) { + try (SyncConfigNodeIServiceClient leaderClient = + (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { + TShowDataNodesResp resp = leaderClient.showDataNodes(); + for (TDataNodeInfo info : resp.getDataNodesInfoList()) { + if (info.dataNodeId == nodeId) { + return dataNodeWrapperList.stream() + .filter(dataNodeWrapper -> dataNodeWrapper.getPort() == info.rpcPort) + .findAny(); + } + } + return Optional.empty(); + } catch (Exception e) { + return Optional.empty(); + } + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 517d7c152fb..dbc43f8fc41 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -486,6 +486,20 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper { } } + @Override + public void stopForcibly() { + if (this.instance == null) { + return; + } + try { + this.instance.destroyForcibly().waitFor(5, TimeUnit.SECONDS); + logger.info("Node {} has been successfully forcibly stopped", nodePort); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Waiting node to shutdown error. %s", e); + } + } + @Override public final String getIp() { return this.nodeAddress; @@ -540,7 +554,7 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper { return clusterIndex + HYPHEN + outputCommonConfig.getClusterConfigStr(); } - protected String getNodePath() { + public String getNodePath() { return System.getProperty(USER_DIR) + File.separator + TARGET + File.separator + getId(); } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index 25a9ac319ca..000d13d6b7c 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -49,6 +49,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.apache.iotdb.jdbc.Config.VERSION; @@ -353,4 +354,9 @@ public class RemoteServerEnv implements BaseEnv { public String getLibPath() { throw new UnsupportedOperationException(); } + + @Override + public Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId) { + throw new UnsupportedOperationException(); + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index 120b0c1a810..fcfd09d4516 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -38,6 +38,7 @@ import java.net.URL; import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import java.util.Optional; public interface BaseEnv { @@ -233,4 +234,6 @@ public interface BaseEnv { String getToolsPath(); String getLibPath(); + + Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId); } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java index 165ed3fd62f..73cd372b634 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseNodeWrapper.java @@ -31,6 +31,8 @@ public interface BaseNodeWrapper { void stop(); + void stopForcibly(); + String getIp(); int getPort(); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java new file mode 100644 index 00000000000..ffdda9a088a --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; +import org.apache.iotdb.it.env.EnvFactory; + +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class IoTDBRegionMigrateReliabilityIT { + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBRegionMigrateReliabilityIT.class); + private static final String INSERTION = + "INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 10.1, 20.7)"; + private static final String SHOW_REGIONS = "show regions"; + private static final String SHOW_DATANODES = "show datanodes"; + private static final String REGION_MIGRATE_COMMAND_FORMAT = "migrate region %d from %d to %d"; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + // region Normal tests + + @Test + public void normal1C2DTest() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + EnvFactory.getEnv().initClusterEnvironment(1, 2); + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + + statement.execute(INSERTION); + + ResultSet result = statement.executeQuery(SHOW_REGIONS); + Map<Integer, Set<Integer>> regionMap = getRegionMap(result); + + result = statement.executeQuery(SHOW_DATANODES); + Set<Integer> dataNodeSet = new HashSet<>(); + while (result.next()) { + dataNodeSet.add(result.getInt(ColumnHeaderConstant.NODE_ID)); + } + + final int selectedRegion = selectRegion(regionMap); + final int originalDataNode = selectOriginalDataNode(regionMap, selectedRegion); + final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, selectedRegion); + + statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, destDataNode)); + + awaitUntilSuccess(statement, selectedRegion, originalDataNode, destDataNode); + + checkRegionFileClear(originalDataNode); + + LOGGER.info("test pass"); + } + } + + @Test + public void normal3C3DTest() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(2) + .setSchemaReplicationFactor(3); + EnvFactory.getEnv().initClusterEnvironment(3, 3); + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement(); + final SyncConfigNodeIServiceClient configClient = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + statement.execute(INSERTION); + + ResultSet result = statement.executeQuery(SHOW_REGIONS); + Map<Integer, Set<Integer>> regionMap = getRegionMap(result); + + result = statement.executeQuery(SHOW_DATANODES); + Set<Integer> dataNodeSet = new HashSet<>(); + while (result.next()) { + dataNodeSet.add(result.getInt(ColumnHeaderConstant.NODE_ID)); + } + + final int selectedRegion = selectRegion(regionMap); + final int originalDataNode = selectOriginalDataNode(regionMap, selectedRegion); + final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, selectedRegion); + + // set breakpoint + HashMap<String, Runnable> keywordAction = new HashMap<>(); + Arrays.stream(RegionTransitionState.values()) + .forEach( + state -> + keywordAction.put( + String.valueOf(state), () -> LOGGER.info(String.valueOf(state)))); + ExecutorService service = IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT"); + LOGGER.info("breakpoint setting..."); + service.submit(() -> logBreakpointMonitor(0, keywordAction)); + service.submit(() -> logBreakpointMonitor(1, keywordAction)); + service.submit(() -> logBreakpointMonitor(2, keywordAction)); + LOGGER.info("breakpoint set"); + + statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, destDataNode)); + + awaitUntilSuccess(statement, selectedRegion, originalDataNode, destDataNode); + + checkRegionFileClear(originalDataNode); + + LOGGER.info("test pass"); + } + } + + // endregion + + // region ConfigNode crash tests + @Test + public void cnCrashDuringPreCheck() {} + + @Test + public void cnCrashDuringCreatePeer() {} + + @Test + public void cnCrashDuringAddPeer() {} + + // TODO: other cn crash test + + // endregion + + // region DataNode crash tests + + // endregion + + // region Helpers + + /** + * Monitor the node's log and do something. + * + * @param nodeIndex + * @param keywordAction Map<keyword, action> + */ + private static void logBreakpointMonitor(int nodeIndex, HashMap<String, Runnable> keywordAction) { + ProcessBuilder builder = + new ProcessBuilder( + "tail", + "-f", + EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).getNodePath() + + File.separator + + "logs" + + File.separator + + "log_confignode_all.log"); + builder.redirectErrorStream(true); // 将错误输出和标准输出合并 + + try { + Process process = builder.start(); // 开始执行命令 + // 读取命令的输出 + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + Set<String> detected = new HashSet<>(); + String finalLine = line; + keywordAction + .keySet() + .forEach( + k -> { + if (finalLine.contains(k)) { + detected.add(k); + } + }); + detected.forEach( + k -> { + keywordAction.get(k).run(); + // + // EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).stopForcibly(); + keywordAction.remove(k); + }); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static String regionMigrateCommand(int who, int from, int to) { + return String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to); + } + + private static Map<Integer, Set<Integer>> getRegionMap(ResultSet showRegionsResult) + throws SQLException { + Map<Integer, Set<Integer>> regionMap = new HashMap<>(); + while (showRegionsResult.next()) { + if (String.valueOf(TConsensusGroupType.DataRegion) + .equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) { + int region = showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID); + int dataNode = showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID); + regionMap.putIfAbsent(region, new HashSet<>()); + regionMap.get(region).add(dataNode); + } + } + return regionMap; + } + + private static int selectRegion(Map<Integer, Set<Integer>> regionMap) { + return regionMap.keySet().stream().findAny().orElseThrow(() -> new RuntimeException("gg")); + } + + private static int selectOriginalDataNode( + Map<Integer, Set<Integer>> regionMap, int selectedRegion) { + return regionMap.get(selectedRegion).stream() + .findAny() + .orElseThrow(() -> new RuntimeException("gg")); + } + + private static int selectDestDataNode( + Set<Integer> dataNodeSet, Map<Integer, Set<Integer>> regionMap, int selectedRegion) { + return dataNodeSet.stream() + .filter(dataNodeId -> !regionMap.get(selectedRegion).contains(dataNodeId)) + .findAny() + .orElseThrow(() -> new RuntimeException("gg")); + } + + private static void awaitUntilSuccess( + Statement statement, int selectedRegion, int originalDataNode, int destDataNode) { + Awaitility.await() + .atMost(1, TimeUnit.MINUTES) + .until( + () -> { + Map<Integer, Set<Integer>> newRegionMap = + getRegionMap(statement.executeQuery(SHOW_REGIONS)); + Set<Integer> dataNodes = newRegionMap.get(selectedRegion); + return !dataNodes.contains(originalDataNode) && dataNodes.contains(destDataNode); + }); + } + + /** Check whether the original DataNode's region file has been deleted. */ + private static void checkRegionFileClear(int dataNode) { + String nodePath = EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().getNodePath(); + File originalRegionDir = + new File( + nodePath + + File.separator + + IoTDBConstant.DATA_FOLDER_NAME + + File.separator + + "datanode" + + File.separator + + IoTDBConstant.CONSENSUS_FOLDER_NAME + + File.separator + + "data_region"); + Assert.assertTrue(originalRegionDir.isDirectory()); + Assert.assertEquals(0, Objects.requireNonNull(originalRegionDir.listFiles()).length); + } + + // endregion +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 4a3a2547cea..3db3ece81f2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -51,6 +51,7 @@ import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.ProcedureExecutor; import org.apache.iotdb.confignode.procedure.ProcedureMetrics; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure; import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure; import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure; @@ -113,10 +114,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; +import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; + public class ProcedureManager { private static final Logger LOGGER = LoggerFactory.getLogger(ProcedureManager.class); @@ -513,6 +517,7 @@ public class ProcedureManager { } public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) { + // get region group id TConsensusGroupId regionGroupId; if (configManager .getPartitionManager() @@ -540,12 +545,13 @@ public class ProcedureManager { return status; } - TDataNodeLocation originalDataNode = + // find original dn and dest dn + final TDataNodeLocation originalDataNode = configManager .getNodeManager() .getRegisteredDataNode(migrateRegionReq.getFromId()) .getLocation(); - TDataNodeLocation destDataNode = + final TDataNodeLocation destDataNode = configManager .getNodeManager() .getRegisteredDataNode(migrateRegionReq.getToId()) @@ -560,7 +566,8 @@ public class ProcedureManager { "Submit RegionMigrateProcedure failed, because no original DataNode " + migrateRegionReq.getFromId()); return status; - } else if (destDataNode == null) { + } + if (destDataNode == null) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because no target DataNode {}", migrateRegionReq.getToId()); @@ -569,8 +576,9 @@ public class ProcedureManager { "Submit RegionMigrateProcedure failed, because no target DataNode " + migrateRegionReq.getToId()); return status; - } else if (configManager.getPartitionManager() - .getAllReplicaSets(originalDataNode.getDataNodeId()).stream() + } + if (configManager.getPartitionManager().getAllReplicaSets(originalDataNode.getDataNodeId()) + .stream() .noneMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because the original DataNode {} doesn't contain Region {}", @@ -583,8 +591,8 @@ public class ProcedureManager { + " doesn't contain Region " + migrateRegionReq.getRegionId()); return status; - } else if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId()) - .stream() + } + if (configManager.getPartitionManager().getAllReplicaSets(destDataNode.getDataNodeId()).stream() .anyMatch(replicaSet -> replicaSet.getRegionId().equals(regionGroupId))) { LOGGER.warn( "Submit RegionMigrateProcedure failed, because the target DataNode {} already contains Region {}", @@ -629,8 +637,37 @@ public class ProcedureManager { + " is ReadOnly or Unknown."); return status; } + + // select coordinator for adding peer + DataNodeRemoveHandler handler = new DataNodeRemoveHandler(configManager); + Optional<TDataNodeLocation> selectedDataNode = + handler.filterDataNodeWithOtherRegionReplica(regionGroupId, destDataNode); + if (!selectedDataNode.isPresent()) { + LOGGER.warn( + "{}, There are no other DataNodes could be selected to perform the add peer process, " + + "please check RegionGroup: {} by show regions sql command", + REGION_MIGRATE_PROCESS, + regionGroupId); + TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + status.setMessage( + "There are no other DataNodes could be selected to perform the add peer process, " + + "please check by show regions sql command"); + return status; + } + final TDataNodeLocation coordinatorForAddPeer = selectedDataNode.get(); + + // Select coordinator for removing peer + // For now, destDataNode temporarily acts as the coordinatorForRemovePeer + final TDataNodeLocation coordinatorForRemovePeer = destDataNode; + + // finally, submit procedure this.executor.submitProcedure( - new RegionMigrateProcedure(regionGroupId, originalDataNode, destDataNode)); + new RegionMigrateProcedure( + regionGroupId, + originalDataNode, + destDataNode, + coordinatorForAddPeer, + coordinatorForRemovePeer)); LOGGER.info( "Submit RegionMigrateProcedure successfully, Region: {}, From: {}, To: {}", migrateRegionReq.getRegionId(), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index e919f127d7f..518f3cc75c7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -205,14 +205,17 @@ public class ConfigNodeProcedureEnv { } public boolean doubleCheckReplica(TDataNodeLocation removedDatanode) { - return getNodeManager() - .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly) - .size() - - Boolean.compare( - getLoadManager().getNodeStatus(removedDatanode.getDataNodeId()) - != NodeStatus.Unknown, - false) - >= NodeInfo.getMinimumDataNode(); + final int runningOrReadOnlyDataNodeNum = + getNodeManager() + .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.ReadOnly) + .size(); + int dataNodeNumAfterRemoving; + if (getLoadManager().getNodeStatus(removedDatanode.getDataNodeId()) != NodeStatus.Unknown) { + dataNodeNumAfterRemoving = runningOrReadOnlyDataNodeNum - 1; + } else { + dataNodeNumAfterRemoving = runningOrReadOnlyDataNodeNum; + } + return dataNodeNumAfterRemoving >= NodeInfo.getMinimumDataNode(); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index d7029d3c5bf..78218dea7db 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -232,41 +232,24 @@ public class DataNodeRemoveHandler { * @param regionId region id * @return TSStatus */ - public TSStatus addRegionPeer(TDataNodeLocation destDataNode, TConsensusGroupId regionId) { + public TSStatus addRegionPeer( + TDataNodeLocation destDataNode, TConsensusGroupId regionId, TDataNodeLocation coordinator) { TSStatus status; - // Here we pick the DataNode who contains one of the RegionReplica of the specified - // ConsensusGroup except the new one - // in order to notify the origin ConsensusGroup that another peer is created and demand to join - Optional<TDataNodeLocation> selectedDataNode = - filterDataNodeWithOtherRegionReplica(regionId, destDataNode); - if (!selectedDataNode.isPresent()) { - LOGGER.warn( - "{}, There are no other DataNodes could be selected to perform the add peer process, " - + "please check RegionGroup: {} by show regions sql command", - REGION_MIGRATE_PROCESS, - regionId); - status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage( - "There are no other DataNodes could be selected to perform the add peer process, " - + "please check by show regions sql command"); - return status; - } - // Send addRegionPeer request to the selected DataNode, // destDataNode is where the new RegionReplica is created TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, destDataNode); status = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( - selectedDataNode.get().getInternalEndPoint(), + coordinator.getInternalEndPoint(), maintainPeerReq, DataNodeRequestType.ADD_REGION_PEER); LOGGER.info( "{}, Send action addRegionPeer finished, regionId: {}, rpcDataNode: {}, destDataNode: {}", REGION_MIGRATE_PROCESS, regionId, - getIdWithRpcEndpoint(selectedDataNode.get()), + getIdWithRpcEndpoint(coordinator), getIdWithRpcEndpoint(destDataNode)); return status; } @@ -283,33 +266,23 @@ public class DataNodeRemoveHandler { */ public TSStatus removeRegionPeer( TDataNodeLocation originalDataNode, - TDataNodeLocation destDataNode, - TConsensusGroupId regionId) { + TConsensusGroupId regionId, + TDataNodeLocation coordinator) { TSStatus status; - TDataNodeLocation rpcClientDataNode; - - // Here we pick the DataNode who contains one of the RegionReplica of the specified - // ConsensusGroup except the origin one - // in order to notify the new ConsensusGroup that the origin peer should secede now - // If the selectedDataNode equals null, we choose the destDataNode to execute the method - Optional<TDataNodeLocation> selectedDataNode = - filterDataNodeWithOtherRegionReplica(regionId, originalDataNode); - rpcClientDataNode = selectedDataNode.orElse(destDataNode); - // Send removeRegionPeer request to the rpcClientDataNode TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, originalDataNode); status = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( - rpcClientDataNode.getInternalEndPoint(), + coordinator.getInternalEndPoint(), maintainPeerReq, DataNodeRequestType.REMOVE_REGION_PEER); LOGGER.info( "{}, Send action removeRegionPeer finished, regionId: {}, rpcDataNode: {}", REGION_MIGRATE_PROCESS, regionId, - getIdWithRpcEndpoint(rpcClientDataNode)); + getIdWithRpcEndpoint(coordinator)); return status; } @@ -646,7 +619,7 @@ public class DataNodeRemoveHandler { * @return A DataNodeLocation that contains other RegionReplica and different from the * filterLocation */ - private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica( + public Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica( TConsensusGroupId regionId, TDataNodeLocation filterLocation) { List<TDataNodeLocation> regionLocations = findRegionLocations(regionId); if (regionLocations.isEmpty()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java index 79640e7739c..103ab321fd8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java @@ -127,9 +127,21 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod regionId -> { TDataNodeLocation destDataNode = env.getDataNodeRemoveHandler().findDestDataNode(regionId); + // TODO: need to improve the coordinator selection method here, maybe through load + // balancing and other means. + final TDataNodeLocation coordinatorForAddPeer = + env.getDataNodeRemoveHandler() + .filterDataNodeWithOtherRegionReplica(regionId, destDataNode) + .orElse(removedDataNode); + final TDataNodeLocation coordinatorForRemovePeer = destDataNode; if (destDataNode != null) { RegionMigrateProcedure regionMigrateProcedure = - new RegionMigrateProcedure(regionId, removedDataNode, destDataNode); + new RegionMigrateProcedure( + regionId, + removedDataNode, + destDataNode, + coordinatorForAddPeer, + coordinatorForRemovePeer); addChildProcedure(regionMigrateProcedure); LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId); } else { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java index 56369316b7d..30555f33fd5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; +import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint; import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; import static org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint; import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; @@ -63,6 +64,9 @@ public class RegionMigrateProcedure private TDataNodeLocation destDataNode; + private TDataNodeLocation coordinatorForAddPeer; + private TDataNodeLocation coordinatorForRemovePeer; + private boolean migrateSuccess = true; private String migrateResult = ""; @@ -74,11 +78,15 @@ public class RegionMigrateProcedure public RegionMigrateProcedure( TConsensusGroupId consensusGroupId, TDataNodeLocation originalDataNode, - TDataNodeLocation destDataNode) { + TDataNodeLocation destDataNode, + TDataNodeLocation coordinatorForAddPeer, + TDataNodeLocation coordinatorForRemovePeer) { super(); this.consensusGroupId = consensusGroupId; this.originalDataNode = originalDataNode; this.destDataNode = destDataNode; + this.coordinatorForAddPeer = coordinatorForAddPeer; + this.coordinatorForRemovePeer = coordinatorForRemovePeer; } @Override @@ -91,32 +99,39 @@ public class RegionMigrateProcedure try { switch (state) { case REGION_MIGRATE_PREPARE: + logBreakpoint(state.name()); setNextState(RegionTransitionState.CREATE_NEW_REGION_PEER); break; case CREATE_NEW_REGION_PEER: handler.createNewRegionPeer(consensusGroupId, destDataNode); + logBreakpoint(state.name()); setNextState(RegionTransitionState.ADD_REGION_PEER); break; case ADD_REGION_PEER: - tsStatus = handler.addRegionPeer(destDataNode, consensusGroupId); + tsStatus = handler.addRegionPeer(destDataNode, consensusGroupId, coordinatorForAddPeer); if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId, state); } else { throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode"); } + logBreakpoint(state.name()); setNextState(RegionTransitionState.CHANGE_REGION_LEADER); break; case CHANGE_REGION_LEADER: handler.changeRegionLeader(consensusGroupId, originalDataNode, destDataNode); + logBreakpoint(state.name()); setNextState(RegionTransitionState.REMOVE_REGION_PEER); break; case REMOVE_REGION_PEER: - tsStatus = handler.removeRegionPeer(originalDataNode, destDataNode, consensusGroupId); + tsStatus = + handler.removeRegionPeer( + originalDataNode, consensusGroupId, coordinatorForRemovePeer); if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId, state); } else { throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode"); } + logBreakpoint(state.name()); setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER); break; case DELETE_OLD_REGION_PEER: @@ -124,12 +139,14 @@ public class RegionMigrateProcedure if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { waitForOneMigrationStepFinished(consensusGroupId, state); } + logBreakpoint(state.name()); // Remove consensus group after a node stop, which will be failed, but we will // continuously execute. setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE); break; case UPDATE_REGION_LOCATION_CACHE: handler.updateRegionLocationCache(consensusGroupId, originalDataNode, destDataNode); + logBreakpoint(state.name()); return Flow.NO_MORE_STATE; } } catch (Exception e) { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedureTest.java index 45967cd11d9..521b04ecf5c 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedureTest.java @@ -52,7 +52,21 @@ public class RegionMigrateProcedureTest { new TEndPoint("127.0.0.1", 6), new TEndPoint("127.0.0.1", 7), new TEndPoint("127.0.0.1", 8), - new TEndPoint("127.0.0.1", 9))); + new TEndPoint("127.0.0.1", 9)), + new TDataNodeLocation( + 11, + new TEndPoint("127.0.0.1", 10), + new TEndPoint("127.0.0.1", 11), + new TEndPoint("127.0.0.1", 12), + new TEndPoint("127.0.0.1", 13), + new TEndPoint("127.0.0.1", 14)), + new TDataNodeLocation( + 15, + new TEndPoint("127.0.0.1", 15), + new TEndPoint("127.0.0.1", 16), + new TEndPoint("127.0.0.1", 17), + new TEndPoint("127.0.0.1", 18), + new TEndPoint("127.0.0.1", 19))); try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index babe939393f..94e116882e0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -279,6 +279,7 @@ public class IoTConsensus implements IConsensus { IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + // TODO:阻写? if (impl.getConfiguration().contains(peer)) { throw new PeerAlreadyInConsensusGroupException(groupId, peer); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java index 015621b0bfe..bc3d2229dfb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java @@ -34,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.TimeUnit; public class FileUtils { private static final Logger LOGGER = LoggerFactory.getLogger(FileUtils.class); @@ -181,4 +182,13 @@ public class FileUtils { } return file; } + + public static void logBreakpoint(String logContent) { + LOGGER.info("breakpoint:{}", logContent); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } }
