This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d46dd660a67 Region migration related work (#12246)
d46dd660a67 is described below
commit d46dd660a67086285f1095b0c53d4e021a4aa0c4
Author: Li Yu Heng <[email protected]>
AuthorDate: Sat Mar 30 10:58:47 2024 +0800
Region migration related work (#12246)
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 15 +
.../it/env/cluster/node/AbstractNodeWrapper.java | 11 +
.../iotdb/it/env/remote/env/RemoteServerEnv.java | 10 +
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 4 +
.../IoTDBRegionMigrateReliabilityITFramework.java} | 431 ++++++++++++---------
.../CoordinatorRemoveRemotePeerCrashIT.java | 51 +++
.../IoTDBRegionMigrateDataNodeCrashIT.java | 72 ++++
.../pass/IoTDBRegionMigrateConfigNodeCrashIT.java | 92 +++++
.../pass/IoTDBRegionMigrateNormalIT.java | 26 +-
.../pass/IoTDBRegionMigrateOtherIT.java | 45 +++
.../iotdb/confignode/manager/node/NodeManager.java | 7 +
.../impl/region/AddRegionPeerProcedure.java | 4 +
.../impl/region/RegionMigrateProcedure.java | 2 +
.../impl/region/RemoveRegionPeerProcedure.java | 4 +
.../apache/iotdb/consensus/iot/IoTConsensus.java | 58 ++-
.../consensus/iot/IoTConsensusServerImpl.java | 69 ++--
.../service/IoTConsensusRPCServiceProcessor.java | 5 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 25 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 66 ++++
.../apache/iotdb/commons/conf/CommonConfig.java | 16 +
.../apache/iotdb/commons/conf/IoTDBConstant.java | 5 +
.../org/apache/iotdb/commons/utils/FileUtils.java | 10 -
.../utils/{ => KillPoint}/DataNodeKillPoints.java | 13 +-
.../IoTConsensusRemovePeerKillPoints.java} | 13 +-
.../iotdb/commons/utils/KillPoint/KillPoint.java | 80 ++++
.../NeverTriggeredKillPoint.java} | 10 +-
.../src/main/thrift/confignode.thrift | 1 +
27 files changed, 877 insertions(+), 268 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 766f2e0c617..68be2f40c51 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -85,6 +85,8 @@ public abstract class AbstractEnv implements BaseEnv {
protected long startTime;
protected int retryCount = 30;
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient>
clientManager;
+ private List<String> configNodeKillPoints = new ArrayList<>();
+ private List<String> dataNodeKillPoints = new ArrayList<>();
/**
* This config object stores the properties set by developers during the
test. It will be cleared
@@ -168,6 +170,7 @@ public abstract class AbstractEnv implements BaseEnv {
(MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(),
(MppJVMConfig) clusterConfig.getConfigNodeJVMConfig());
seedConfigNodeWrapper.createLogDir();
+ seedConfigNodeWrapper.setKillPoints(configNodeKillPoints);
seedConfigNodeWrapper.start();
String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString();
this.configNodeWrapperList.add(seedConfigNodeWrapper);
@@ -202,6 +205,7 @@ public abstract class AbstractEnv implements BaseEnv {
(MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(),
(MppJVMConfig) clusterConfig.getConfigNodeJVMConfig());
configNodeWrapper.createLogDir();
+ configNodeWrapper.setKillPoints(configNodeKillPoints);
configNodesDelegate.addRequest(
() -> {
configNodeWrapper.start();
@@ -236,6 +240,7 @@ public abstract class AbstractEnv implements BaseEnv {
(MppCommonConfig) clusterConfig.getDataNodeCommonConfig(),
(MppJVMConfig) clusterConfig.getDataNodeJVMConfig());
dataNodeWrapper.createLogDir();
+ dataNodeWrapper.setKillPoints(dataNodeKillPoints);
dataNodesDelegate.addRequest(
() -> {
dataNodeWrapper.start();
@@ -1032,4 +1037,14 @@ public abstract class AbstractEnv implements BaseEnv {
return Optional.empty();
}
}
+
+ @Override
+ public void registerConfigNodeKillPoints(List<String> killPoints) {
+ this.configNodeKillPoints = killPoints;
+ }
+
+ @Override
+ public void registerDataNodeKillPoints(List<String> killPoints) {
+ this.dataNodeKillPoints = killPoints;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 4e6372d5b8d..a517fc95829 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.it.env.cluster.node;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.config.MppBaseConfig;
import org.apache.iotdb.it.env.cluster.config.MppCommonConfig;
@@ -125,6 +126,7 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
private int nodePort;
private int metricPort;
private long startTime;
+ private List<String> killPoints = new ArrayList<>();
/**
* Mutable properties are always hardcoded default values to make the
cluster be set up
@@ -453,6 +455,7 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
"-Xmx" + jvmConfig.getMaxHeapSize() + "m",
"-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize()
+ "m",
"-Djdk.nio.maxCachedBufferSize=262144",
+ "-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" +
killPoints.toString(),
"-cp",
server_node_lib_path));
addStartCmdParams(startCmd);
@@ -640,6 +643,14 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
return testClassName + "_" + testMethodName;
}
+ public void setKillPoints(List<String> killPoints) {
+ this.killPoints = killPoints;
+ }
+
+ private String getKillPoints() {
+ return killPoints.toString();
+ }
+
/* Abstract methods, which must be implemented in ConfigNode and DataNode. */
protected abstract void reloadMutableFields();
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
index 9dd357dbae6..eae42bd7a23 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java
@@ -372,4 +372,14 @@ public class RemoteServerEnv implements BaseEnv {
public Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void registerConfigNodeKillPoints(List<String> killPoints) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void registerDataNodeKillPoints(List<String> killPoints) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 38b2383baa8..965e2254e84 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -255,4 +255,8 @@ public interface BaseEnv {
String getLibPath();
Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId);
+
+ void registerConfigNodeKillPoints(List<String> killPoints);
+
+ void registerDataNodeKillPoints(List<String> killPoints);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateReliabilityITFramework.java
similarity index 50%
rename from
integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateReliabilityITFramework.java
index 9073bcc1617..3f33c857fcd 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionMigrateReliabilityITFramework.java
@@ -17,30 +17,26 @@
* under the License.
*/
-package org.apache.iotdb.confignode.it;
+package org.apache.iotdb.confignode.it.regionmigration;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.utils.DataNodeKillPoints;
-import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
-import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
-import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
+import org.apache.iotdb.metrics.utils.SystemType;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +48,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -66,14 +63,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-public class IoTDBRegionMigrateReliabilityIT {
+public class IoTDBRegionMigrateReliabilityITFramework {
private static final Logger LOGGER =
- LoggerFactory.getLogger(IoTDBRegionMigrateReliabilityIT.class);
+ LoggerFactory.getLogger(IoTDBRegionMigrateReliabilityITFramework.class);
private static final String INSERTION =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 10.1,
20.7)";
private static final String SHOW_REGIONS = "show regions";
private static final String SHOW_DATANODES = "show datanodes";
private static final String REGION_MIGRATE_COMMAND_FORMAT = "migrate region
%d from %d to %d";
+ ExecutorService executorService =
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
@Before
public void setUp() throws Exception {
@@ -90,131 +88,59 @@ public class IoTDBRegionMigrateReliabilityIT {
EnvFactory.getEnv().cleanClusterEnvironment();
}
- // region Normal tests
-
- @Test
- public void normal1C2DTest() throws Exception {
- generalTest(1, 1, 1, 2, buildSet(), buildSet());
- }
-
- @Test
- public void normal3C3DTest() throws Exception {
- generalTest(2, 3, 3, 3, buildSet(), buildSet());
- }
-
- // endregion
-
- // region ConfigNode crash tests
- @Test
- public void cnCrashDuringPreCheck() throws Exception {
- generalTest(
- 1, 1, 1, 2,
buildSet(RegionTransitionState.REGION_MIGRATE_PREPARE.toString()), buildSet());
- }
-
- @Test
- public void cnCrashDuringCreatePeer() throws Exception {
- generalTest(
- 1, 1, 1, 2,
buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER.toString()), buildSet());
- }
-
- @Test
- public void cnCrashDuringDoAddPeer() throws Exception {
- generalTest(1, 1, 1, 2,
buildSet(AddRegionPeerState.DO_ADD_REGION_PEER.toString()), buildSet());
- }
-
- @Test
- public void cnCrashDuringUpdateCache() throws Exception {
- generalTest(
- 1,
- 1,
- 1,
- 2,
- buildSet(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE.toString()),
- buildSet());
- }
-
- @Test
- public void cnCrashDuringChangeRegionLeader() throws Exception {
- generalTest(
- 1, 1, 1, 2,
buildSet(RegionTransitionState.CHANGE_REGION_LEADER.toString()), buildSet());
- }
-
- @Test
- public void cnCrashDuringRemoveRegionPeer() throws Exception {
- generalTest(
- 1, 1, 1, 2,
buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER.toString()), buildSet());
- }
-
- @Test
- public void cnCrashDuringDeleteOldRegionPeer() throws Exception {
- generalTest(
- 1, 1, 1, 2,
buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER.toString()), buildSet());
- }
-
- @Test
- public void cnCrashDuringRemoveRegionLocationCache() throws Exception {
- generalTest(
- 1,
- 1,
- 1,
- 2,
-
buildSet(RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE.toString()),
- buildSet());
- }
-
- @Test
- public void cnCrashTest() throws Exception {
- KeySetView<String, Boolean> killConfigNodeKeywords = buildSet();
- killConfigNodeKeywords.addAll(
- Arrays.stream(AddRegionPeerState.values())
- .map(Enum::toString)
- .collect(Collectors.toList()));
- killConfigNodeKeywords.addAll(
- Arrays.stream(RemoveRegionPeerState.values())
- .map(Enum::toString)
- .collect(Collectors.toList()));
- generalTest(1, 1, 1, 2, killConfigNodeKeywords, buildSet());
- }
-
- @Ignore
- @Test
- public void badKillPoint() throws Exception {
- generalTest(1, 1, 1, 2, buildSet("??"), buildSet());
- }
-
- // endregion
-
- // region coordinator DataNode crash tests
-
- @Test
- public void coordinatorCrashDuringRemovePeer() throws Exception {
- generalTest(1, 1, 1, 2, buildSet(),
buildSet(DataNodeKillPoints.CoordinatorRemovePeer.name()));
- }
-
- // endregion
-
- // region original DataNode crash tests
-
- @Test
- public void originalCrashDuringRemovePeer() throws Exception {
- generalTest(1, 1, 1, 2, buildSet(),
buildSet(DataNodeKillPoints.OriginalRemovePeer.name()));
+ public void successTest(
+ final int dataReplicateFactor,
+ final int schemaReplicationFactor,
+ final int configNodeNum,
+ final int dataNodeNum,
+ KeySetView<String, Boolean> killConfigNodeKeywords,
+ KeySetView<String, Boolean> killDataNodeKeywords)
+ throws Exception {
+ generalTestWithAllOptions(
+ dataReplicateFactor,
+ schemaReplicationFactor,
+ configNodeNum,
+ dataNodeNum,
+ killConfigNodeKeywords,
+ killDataNodeKeywords,
+ true,
+ true,
+ 0,
+ true);
}
- @Test
- public void originalCrashDuringDeleteLocalPeer() throws Exception {
- generalTest(
- 1, 1, 1, 2, buildSet(),
buildSet(DataNodeKillPoints.OriginalDeleteOldRegionPeer.name()));
+ public void failTest(
+ final int dataReplicateFactor,
+ final int schemaReplicationFactor,
+ final int configNodeNum,
+ final int dataNodeNum,
+ KeySetView<String, Boolean> killConfigNodeKeywords,
+ KeySetView<String, Boolean> killDataNodeKeywords)
+ throws Exception {
+ generalTestWithAllOptions(
+ dataReplicateFactor,
+ schemaReplicationFactor,
+ configNodeNum,
+ dataNodeNum,
+ killConfigNodeKeywords,
+ killDataNodeKeywords,
+ true,
+ true,
+ 60,
+ false);
}
- // region Helpers
-
- public void generalTest(
+ public void generalTestWithAllOptions(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
KeySetView<String, Boolean> killConfigNodeKeywords,
- KeySetView<String, Boolean> killDataNodeKeywords)
+ KeySetView<String, Boolean> killDataNodeKeywords,
+ final boolean checkOriginalRegionDirDeleted,
+ final boolean checkConfigurationFileDeleted,
+ final int restartTime,
+ final boolean isMigrateSuccess)
throws Exception {
// prepare env
EnvFactory.getEnv()
@@ -222,24 +148,12 @@ public class IoTDBRegionMigrateReliabilityIT {
.getCommonConfig()
.setDataReplicationFactor(dataReplicateFactor)
.setSchemaReplicationFactor(schemaReplicationFactor);
+ EnvFactory.getEnv().registerConfigNodeKillPoints(new
ArrayList<>(killConfigNodeKeywords));
+ EnvFactory.getEnv().registerDataNodeKillPoints(new
ArrayList<>(killDataNodeKeywords));
EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
- ExecutorService service =
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
- EnvFactory.getEnv()
- .getConfigNodeWrapperList()
- .forEach(
- configNodeWrapper ->
- service.submit(() -> nodeLogKillPoint(configNodeWrapper,
killConfigNodeKeywords)));
- EnvFactory.getEnv()
- .getDataNodeWrapperList()
- .forEach(
- dataNodeWrapper ->
- service.submit(() -> nodeLogKillPoint(dataNodeWrapper,
killDataNodeKeywords)));
-
try (final Connection connection = EnvFactory.getEnv().getConnection();
- final Statement statement = connection.createStatement();
- final SyncConfigNodeIServiceClient configClient =
- (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ final Statement statement = connection.createStatement()) {
statement.execute(INSERTION);
@@ -247,38 +161,116 @@ public class IoTDBRegionMigrateReliabilityIT {
Map<Integer, Set<Integer>> regionMap = getRegionMap(result);
result = statement.executeQuery(SHOW_DATANODES);
- Set<Integer> dataNodeSet = new HashSet<>();
+ Set<Integer> allDataNodeId = new HashSet<>();
while (result.next()) {
- dataNodeSet.add(result.getInt(ColumnHeaderConstant.NODE_ID));
+ allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
}
final int selectedRegion = selectRegion(regionMap);
final int originalDataNode = selectOriginalDataNode(regionMap,
selectedRegion);
- final int destDataNode = selectDestDataNode(dataNodeSet, regionMap,
selectedRegion);
+ final int destDataNode = selectDestDataNode(allDataNodeId, regionMap,
selectedRegion);
+
+ checkRegionFileExist(originalDataNode);
+ checkPeersExist(regionMap.get(selectedRegion), originalDataNode,
selectedRegion);
+
+ // set kill points
+ setConfigNodeKillPoints(killConfigNodeKeywords, restartTime);
+ setDataNodeKillPoints(killDataNodeKeywords, restartTime);
+ // region migration start
statement.execute(regionMigrateCommand(selectedRegion, originalDataNode,
destDataNode));
- awaitUntilSuccess(statement, selectedRegion, originalDataNode,
destDataNode);
+ boolean success = false;
+ try {
+ awaitUntilSuccess(statement, selectedRegion, originalDataNode,
destDataNode);
+ success = true;
+ } catch (ConditionTimeoutException e) {
+ LOGGER.error("Region migrate failed", e);
+ }
+ // Assert.assertTrue(isMigrateSuccess == success);
// make sure all kill points have been triggered
- Assert.assertTrue(killConfigNodeKeywords.isEmpty());
- Assert.assertTrue(killDataNodeKeywords.isEmpty());
+ checkKillPointsAllTriggered(killConfigNodeKeywords);
+ checkKillPointsAllTriggered(killDataNodeKeywords);
+
+ if (!success) {
+ restartAllDataNodes();
+ }
+ System.out.println(
+ "originalDataNode: "
+ +
EnvFactory.getEnv().dataNodeIdToWrapper(originalDataNode).get().getNodePath());
+ System.out.println(
+ "destDataNode: "
+ +
EnvFactory.getEnv().dataNodeIdToWrapper(destDataNode).get().getNodePath());
+
+ // check if there is anything remain
+ if (checkOriginalRegionDirDeleted) {
+ if (success) {
+ checkRegionFileClear(originalDataNode);
+ checkRegionFileExist(destDataNode);
+ } else {
+ checkRegionFileClear(destDataNode);
+ checkRegionFileExist(originalDataNode);
+ }
+ }
+ if (checkConfigurationFileDeleted) {
+ if (success) {
+ checkPeersClear(allDataNodeId, originalDataNode, selectedRegion);
+ } else {
+ checkPeersClear(allDataNodeId, destDataNode, selectedRegion);
+ }
+ }
- checkRegionFileClear(originalDataNode);
} catch (InconsistentDataException ignore) {
}
LOGGER.info("test pass");
}
+ private void restartAllDataNodes() {
+ EnvFactory.getEnv()
+ .getDataNodeWrapperList()
+ .parallelStream()
+ .forEach(
+ nodeWrapper -> {
+ nodeWrapper.stopForcibly();
+ nodeWrapper.start();
+ });
+ }
+
+ private void setConfigNodeKillPoints(
+ KeySetView<String, Boolean> killConfigNodeKeywords, int nodeRestartTime)
{
+ EnvFactory.getEnv()
+ .getConfigNodeWrapperList()
+ .forEach(
+ configNodeWrapper ->
+ executorService.submit(
+ () ->
+ nodeLogKillPoint(
+ configNodeWrapper, killConfigNodeKeywords,
nodeRestartTime)));
+ }
+
+ private void setDataNodeKillPoints(
+ KeySetView<String, Boolean> killDataNodeKeywords, int nodeRestartTime) {
+ EnvFactory.getEnv()
+ .getDataNodeWrapperList()
+ .forEach(
+ dataNodeWrapper ->
+ executorService.submit(
+ () ->
+ nodeLogKillPoint(dataNodeWrapper,
killDataNodeKeywords, nodeRestartTime)));
+ }
+
/**
- * Monitor the node's log and do something.
+ * Monitor the node's log and kill it when detect specific log.
*
* @param nodeWrapper Easy to understand
* @param killNodeKeywords When detect these keywords in node's log, stop
the node forcibly
*/
private static void nodeLogKillPoint(
- AbstractNodeWrapper nodeWrapper, KeySetView<String, Boolean>
killNodeKeywords) {
+ AbstractNodeWrapper nodeWrapper,
+ KeySetView<String, Boolean> killNodeKeywords,
+ int restartTime) {
if (killNodeKeywords.isEmpty()) {
return;
}
@@ -288,11 +280,29 @@ public class IoTDBRegionMigrateReliabilityIT {
} else {
logFileName = "log_datanode_all.log";
}
- ProcessBuilder builder =
- new ProcessBuilder(
- "tail",
- "-f",
- nodeWrapper.getNodePath() + File.separator + "logs" +
File.separator + logFileName);
+ SystemType type = SystemType.getSystemType();
+ ProcessBuilder builder;
+ if (type == SystemType.LINUX || type == SystemType.MAC) {
+ builder =
+ new ProcessBuilder(
+ "tail",
+ "-f",
+ nodeWrapper.getNodePath() + File.separator + "logs" +
File.separator + logFileName);
+ } else if (type == SystemType.WINDOWS) {
+ builder =
+ new ProcessBuilder(
+ "powershell",
+ "-Command",
+ "Get-Content "
+ + nodeWrapper.getNodePath()
+ + File.separator
+ + "logs"
+ + File.separator
+ + logFileName
+ + " -Wait");
+ } else {
+ throw new UnsupportedOperationException("Unsupported system type " +
type);
+ }
builder.redirectErrorStream(true);
try {
@@ -302,29 +312,53 @@ public class IoTDBRegionMigrateReliabilityIT {
String line;
while ((line = reader.readLine()) != null) {
// if trigger more than one keyword at a same time, test code may
have mistakes
-
Assert.assertTrue(killNodeKeywords.stream().filter(line::contains).count() <=
1);
+ Assert.assertTrue(
+ line,
+ killNodeKeywords.stream()
+ .map(KillPoint::addKillPointPrefix)
+ .filter(line::contains)
+ .count()
+ <= 1);
String finalLine = line;
Optional<String> detectedKeyword =
killNodeKeywords.stream()
- .filter(keyword -> finalLine.contains("breakpoint:" +
keyword))
+ .filter(keyword ->
finalLine.contains(KillPoint.addKillPointPrefix(keyword)))
.findAny();
if (detectedKeyword.isPresent()) {
// each keyword only trigger once
killNodeKeywords.remove(detectedKeyword.get());
+ LOGGER.info("Kill point is triggered: {}", detectedKeyword);
// reboot the node
nodeWrapper.stopForcibly();
+ if (restartTime > 0) {
+ try {
+ TimeUnit.SECONDS.sleep(restartTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
nodeWrapper.start();
}
if (killNodeKeywords.isEmpty()) {
break;
}
}
+ } catch (AssertionError e) {
+ LOGGER.error("gg", e);
+ throw e;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ void checkKillPointsAllTriggered(KeySetView<String, Boolean> killPoints) {
+ if (!killPoints.isEmpty()) {
+ killPoints.forEach(killPoint -> LOGGER.error("Kill point {} not
triggered", killPoint));
+ Assert.fail("Some kill points was not triggered");
+ }
+ }
+
private static String regionMigrateCommand(int who, int from, int to) {
return String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to);
}
@@ -369,7 +403,7 @@ public class IoTDBRegionMigrateReliabilityIT {
AtomicReference<Exception> lastException = new AtomicReference<>();
try {
Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
+ .atMost(2, TimeUnit.MINUTES)
.until(
() -> {
try {
@@ -400,29 +434,78 @@ public class IoTDBRegionMigrateReliabilityIT {
}
}
+ private static void checkRegionFileExist(int dataNode) {
+ File originalRegionDir = new File(buildRegionDirPath(dataNode));
+ Assert.assertTrue(originalRegionDir.isDirectory());
+ Assert.assertNotEquals(0,
Objects.requireNonNull(originalRegionDir.listFiles()).length);
+ }
+
/** Check whether the original DataNode's region file has been deleted. */
private static void checkRegionFileClear(int dataNode) {
- String nodePath =
EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().getNodePath();
- File originalRegionDir =
- new File(
- nodePath
- + File.separator
- + IoTDBConstant.DATA_FOLDER_NAME
- + File.separator
- + "datanode"
- + File.separator
- + IoTDBConstant.CONSENSUS_FOLDER_NAME
- + File.separator
- + "data_region");
+ File originalRegionDir = new File(buildRegionDirPath(dataNode));
Assert.assertTrue(originalRegionDir.isDirectory());
Assert.assertEquals(0,
Objects.requireNonNull(originalRegionDir.listFiles()).length);
+ LOGGER.info("Original region clear");
}
- private static KeySetView<String, Boolean> buildSet(String... keywords) {
+ private static void checkPeersExist(Set<Integer> dataNodes, int
originalDataNode, int regionId) {
+ dataNodes.forEach(targetDataNode -> checkPeerExist(targetDataNode,
originalDataNode, regionId));
+ }
+
+ private static void checkPeerExist(int checkTargetDataNode, int
originalDataNode, int regionId) {
+ File expectExistedFile =
+ new File(buildConfigurationDataFilePath(checkTargetDataNode,
originalDataNode, regionId));
+ Assert.assertTrue(
+ "configuration file should exist, but it didn't: " +
expectExistedFile.getPath(),
+ expectExistedFile.exists());
+ }
+
+ private static void checkPeersClear(Set<Integer> dataNodes, int
originalDataNode, int regionId) {
+ dataNodes.stream()
+ .filter(dataNode -> dataNode != originalDataNode)
+ .forEach(targetDataNode -> checkPeerClear(targetDataNode,
originalDataNode, regionId));
+ LOGGER.info("Peer clear");
+ }
+
+ private static void checkPeerClear(int checkTargetDataNode, int
originalDataNode, int regionId) {
+ File expectDeletedFile =
+ new File(buildConfigurationDataFilePath(checkTargetDataNode,
originalDataNode, regionId));
+ Assert.assertFalse(
+ "configuration file should be deleted, but it didn't: " +
expectDeletedFile.getPath(),
+ expectDeletedFile.exists());
+ }
+
+ private static String buildRegionDirPath(int dataNode) {
+ String nodePath =
EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().getNodePath();
+ return nodePath
+ + File.separator
+ + IoTDBConstant.DATA_FOLDER_NAME
+ + File.separator
+ + "datanode"
+ + File.separator
+ + IoTDBConstant.CONSENSUS_FOLDER_NAME
+ + File.separator
+ + IoTDBConstant.DATA_REGION_FOLDER_NAME;
+ }
+
+ private static String buildConfigurationDataFilePath(
+ int localDataNodeId, int remoteDataNodeId, int regionId) {
+ String configurationDatDirName =
+ buildRegionDirPath(localDataNodeId) + File.separator + "1_" + regionId;
+ String expectDeletedFileName =
+
IoTConsensusServerImpl.generateConfigurationDatFileName(remoteDataNodeId);
+ return configurationDatDirName + File.separator + expectDeletedFileName;
+ }
+
+ protected static KeySetView<String, Boolean> noKillPoints() {
+ return ConcurrentHashMap.newKeySet();
+ }
+
+ @SafeVarargs
+ protected static <T extends Enum<?>> KeySetView<String, Boolean>
buildSet(T... keywords) {
KeySetView<String, Boolean> result = ConcurrentHashMap.newKeySet();
- result.addAll(Arrays.asList(keywords));
+ result.addAll(
+
Arrays.stream(keywords).map(KillPoint::enumToString).collect(Collectors.toList()));
return result;
}
-
- // endregion
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/CoordinatorRemoveRemotePeerCrashIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/CoordinatorRemoveRemotePeerCrashIT.java
new file mode 100644
index 00000000000..7533424909b
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/CoordinatorRemoveRemotePeerCrashIT.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.notpass.datanodecrash;
+
+import
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerKillPoints;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+
+import org.junit.Test;
+
+public class CoordinatorRemoveRemotePeerCrashIT extends
IoTDBRegionMigrateReliabilityITFramework {
+ private <T extends Enum<T>> void base(T... dataNodeKillPoints) throws
Exception {
+ successTest(1, 1, 1, 2, noKillPoints(), buildSet(dataNodeKillPoints));
+ }
+
+ @Test
+ public void initCrash() throws Exception {
+ base(IoTConsensusRemovePeerKillPoints.INIT);
+ }
+
+ @Test
+ public void crashAfterNotifyPeersToRemoveSyncLogChannel() throws Exception {
+
base(IoTConsensusRemovePeerKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
+ }
+
+ @Test
+ public void crashAfterInactivePeer() throws Exception {
+ base(IoTConsensusRemovePeerKillPoints.AFTER_INACTIVE_PEER);
+ }
+
+ @Test
+ public void crashAfterFinish() throws Exception {
+ base(IoTConsensusRemovePeerKillPoints.FINISH);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/IoTDBRegionMigrateDataNodeCrashIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/IoTDBRegionMigrateDataNodeCrashIT.java
new file mode 100644
index 00000000000..6b023274aef
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/notpass/datanodecrash/IoTDBRegionMigrateDataNodeCrashIT.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.notpass.datanodecrash;
+
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+
+import org.junit.Test;
+
+public class IoTDBRegionMigrateDataNodeCrashIT extends
IoTDBRegionMigrateReliabilityITFramework {
+ // region Coordinator DataNode crash tests
+
+ @Test
+ public void coordinatorCrashDuringAddPeerTransition() throws Exception {
+ failTest(
+ 2, 2, 1, 3, noKillPoints(),
buildSet(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION));
+ }
+
+ @Test
+ public void coordinatorCrashDuringAddPeerDone() throws Exception {
+ failTest(2, 2, 1, 3, noKillPoints(),
buildSet(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE));
+ }
+
+ // endregion ----------------------------------------------
+
+ // region Original DataNode crash tests
+
+ @Test
+ public void originalCrashDuringAddPeerDone() throws Exception {
+ failTest(2, 2, 1, 3, noKillPoints(),
buildSet(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE));
+ }
+
+ // endregion ----------------------------------------------
+
+ // region Destination DataNode crash tests
+
+ @Test
+ public void destinationCrashDuringCreateLocalPeer() throws Exception {
+ failTest(
+ 2, 2, 1, 3, noKillPoints(),
buildSet(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER));
+ }
+
+ @Test
+ public void destinationCrashDuringAddPeerTransition() throws Exception {
+ failTest(
+ 2, 2, 1, 3, noKillPoints(),
buildSet(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION));
+ }
+
+ @Test
+ public void destinationCrashDuringAddPeerDone() throws Exception {
+ failTest(2, 2, 1, 3, noKillPoints(),
buildSet(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE));
+ }
+
+ // endregion ----------------------------------------------
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
new file mode 100644
index 00000000000..5b21cc81bb1
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateConfigNodeCrashIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass;
+
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
+import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
+import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class IoTDBRegionMigrateConfigNodeCrashIT extends
IoTDBRegionMigrateReliabilityITFramework {
+ @Test
+ @Ignore
+ public void cnCrashDuringPreCheck() throws Exception {
+ successTest(1, 1, 1, 2,
buildSet(RegionTransitionState.REGION_MIGRATE_PREPARE), noKillPoints());
+ }
+
+ @Test
+ public void cnCrashDuringCreatePeer() throws Exception {
+ successTest(1, 1, 1, 2,
buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), noKillPoints());
+ }
+
+ @Test
+ public void cnCrashDuringDoAddPeer() throws Exception {
+ successTest(1, 1, 1, 2, buildSet(AddRegionPeerState.DO_ADD_REGION_PEER),
noKillPoints());
+ }
+
+ @Test
+ public void cnCrashDuringUpdateCache() throws Exception {
+ successTest(
+ 1, 1, 1, 2, buildSet(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE),
noKillPoints());
+ }
+
+ @Test
+ public void cnCrashDuringChangeRegionLeader() throws Exception {
+ successTest(1, 1, 1, 2,
buildSet(RegionTransitionState.CHANGE_REGION_LEADER), noKillPoints());
+ }
+
+ @Test
+ public void cnCrashDuringRemoveRegionPeer() throws Exception {
+ successTest(1, 1, 1, 2,
buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), noKillPoints());
+ }
+
+ @Test
+ public void cnCrashDuringDeleteOldRegionPeer() throws Exception {
+ successTest(1, 1, 1, 2,
buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), noKillPoints());
+ }
+
+ @Test
+ public void cnCrashDuringRemoveRegionLocationCache() throws Exception {
+ successTest(
+ 1, 1, 1, 2,
buildSet(RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE), noKillPoints());
+ }
+
+ @Test
+ public void cnCrashTest() throws Exception {
+ ConcurrentHashMap.KeySetView<String, Boolean> killConfigNodeKeywords =
noKillPoints();
+ killConfigNodeKeywords.addAll(
+ Arrays.stream(AddRegionPeerState.values())
+ .map(KillPoint::enumToString)
+ .collect(Collectors.toList()));
+ killConfigNodeKeywords.addAll(
+ Arrays.stream(RemoveRegionPeerState.values())
+ .map(KillPoint::enumToString)
+ .collect(Collectors.toList()));
+ successTest(1, 1, 1, 2, killConfigNodeKeywords, noKillPoints());
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateNormalIT.java
similarity index 51%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
copy to
integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateNormalIT.java
index 9d08cc09b78..072530da626 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateNormalIT.java
@@ -17,12 +17,26 @@
* under the License.
*/
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.confignode.it.regionmigration.pass;
-@TestOnly
-public enum DataNodeKillPoints {
- OriginalRemovePeer,
- OriginalDeleteOldRegionPeer,
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
- CoordinatorRemovePeer,
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBRegionMigrateNormalIT extends
IoTDBRegionMigrateReliabilityITFramework {
+ @Test
+ public void normal1C2DTest() throws Exception {
+ successTest(1, 1, 1, 2, noKillPoints(), noKillPoints());
+ }
+
+ @Test
+ public void normal3C3DTest() throws Exception {
+ successTest(2, 3, 3, 3, noKillPoints(), noKillPoints());
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateOtherIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateOtherIT.java
new file mode 100644
index 00000000000..6d9444c9e4f
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateOtherIT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass;
+
+import org.apache.iotdb.commons.utils.KillPoint.NeverTriggeredKillPoint;
+import
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateReliabilityITFramework;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBRegionMigrateOtherIT extends
IoTDBRegionMigrateReliabilityITFramework {
+ @Test
+ public void badKillPoint() throws Exception {
+ try {
+ successTest(
+ 1, 1, 1, 2,
buildSet(NeverTriggeredKillPoint.NEVER_TRIGGERED_KILL_POINT), noKillPoints());
+ } catch (AssertionError e) {
+ return;
+ }
+ Assert.fail("kill point not triggered but test pass");
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 008882b67cd..f66663a9483 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.node;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -95,6 +96,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
/** {@link NodeManager} manages cluster node addition and removal requests. */
public class NodeManager {
@@ -353,6 +355,11 @@ public class NodeManager {
resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_RESTART);
resp.setRuntimeConfiguration(getRuntimeConfiguration().setClusterId(clusterId));
+ List<TConsensusGroupId> consensusGroupIds =
+ getPartitionManager().getAllReplicaSets(nodeId).stream()
+ .map(TRegionReplicaSet::getRegionId)
+ .collect(Collectors.toList());
+ resp.setConsensusGroupIds(consensusGroupIds);
return resp;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index 000f47067fc..853262625b7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -41,6 +41,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
import static
org.apache.iotdb.confignode.procedure.state.AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
@@ -79,11 +80,13 @@ public class AddRegionPeerProcedure
switch (state) {
case CREATE_NEW_REGION_PEER:
handler.createNewRegionPeer(consensusGroupId, destDataNode);
+ setKillPoint(state);
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
break;
case DO_ADD_REGION_PEER:
TSStatus tsStatus =
handler.addRegionPeer(this.getProcId(), destDataNode,
consensusGroupId, coordinator);
+ setKillPoint(state);
TRegionMigrateResult result;
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
result = handler.waitTaskFinish(this.getProcId(), coordinator);
@@ -124,6 +127,7 @@ public class AddRegionPeerProcedure
}
case UPDATE_REGION_LOCATION_CACHE:
handler.addRegionLocation(consensusGroupId, destDataNode);
+ setKillPoint(state);
LOGGER.info("AddRegionPeer state {} complete", state);
LOGGER.info(
"AddRegionPeerProcedure success, region {} has been added to
DataNode {}",
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index b6f418f0dc9..7823a6dc826 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl.region;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
@@ -103,6 +104,7 @@ public class RegionMigrateProcedure
break;
case CHANGE_REGION_LEADER:
handler.changeRegionLeader(consensusGroupId, originalDataNode,
destDataNode);
+ KillPoint.setKillPoint(state);
setNextState(RegionTransitionState.REMOVE_REGION_PEER);
break;
case REMOVE_REGION_PEER:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index fdf62606e66..7055a274874 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -42,6 +42,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER;
import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
@@ -81,6 +82,7 @@ public class RemoveRegionPeerProcedure
tsStatus =
handler.removeRegionPeer(
this.getProcId(), targetDataNode, consensusGroupId,
coordinator);
+ setKillPoint(state);
if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
throw new ProcedureException("REMOVE_REGION_PEER executed failed
in DataNode");
}
@@ -94,6 +96,7 @@ public class RemoveRegionPeerProcedure
case DELETE_OLD_REGION_PEER:
tsStatus =
handler.deleteOldRegionPeer(this.getProcId(), targetDataNode,
consensusGroupId);
+ setKillPoint(state);
if (tsStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
throw new ProcedureException("DELETE_OLD_REGION_PEER executed
failed in DataNode");
}
@@ -106,6 +109,7 @@ public class RemoveRegionPeerProcedure
break;
case REMOVE_REGION_LOCATION_CACHE:
handler.removeRegionLocation(consensusGroupId, targetDataNode);
+ setKillPoint(state);
LOGGER.info("RemoveRegionPeer state {} success", state);
LOGGER.info(
"RemoveRegionPeerProcedure success, region {} has been removed
from DataNode {}",
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index dc857dc18cd..e9bb1fb6eb5 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -29,6 +29,9 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
@@ -253,6 +256,7 @@ public class IoTConsensus implements IConsensus {
() ->
new ConsensusException(
String.format("Unable to create consensus dir for group
%s", groupId)));
+ KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
if (exist.get()) {
throw new ConsensusGroupAlreadyExistException(groupId);
}
@@ -287,22 +291,23 @@ public class IoTConsensus implements IConsensus {
logger.info("[IoTConsensus] inactivate new peer: {}", peer);
impl.inactivePeer(peer);
- // step 2: notify all the other Peers to build the sync connection to
newPeer
- logger.info("[IoTConsensus] notify current peers to build sync log...");
- impl.checkAndLockSafeDeletedSearchIndex();
- impl.notifyPeersToBuildSyncLogChannel(peer);
-
- // step 3: take snapshot
+ // step 2: take snapshot
logger.info("[IoTConsensus] start to take snapshot...");
+ impl.checkAndLockSafeDeletedSearchIndex();
impl.takeSnapshot();
- // step 4: transit snapshot
+ // step 3: transit snapshot
logger.info("[IoTConsensus] start to transit snapshot...");
impl.transitSnapshot(peer);
- // step 5: let the new peer load snapshot
+ // step 4: let the new peer load snapshot
logger.info("[IoTConsensus] trigger new peer to load snapshot...");
impl.triggerSnapshotLoad(peer);
+
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);
+
+ // step 5: notify all the other Peers to build the sync connection to
newPeer
+ logger.info("[IoTConsensus] notify current peers to build sync log...");
+ impl.notifyPeersToBuildSyncLogChannel(peer);
// step 6: active new Peer
logger.info("[IoTConsensus] activate new peer...");
@@ -311,8 +316,19 @@ public class IoTConsensus implements IConsensus {
// step 7: spot clean
logger.info("[IoTConsensus] do spot clean...");
doSpotClean(peer, impl);
+ KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
} catch (ConsensusGroupModifyPeerException e) {
+ try {
+ logger.info("[IoTConsensus] add remote peer failed, automatic cleanup
side effects...");
+
+ // clean up the sync log channel
+ impl.notifyPeersToRemoveSyncLogChannel(peer);
+
+ } catch (ConsensusGroupModifyPeerException mpe) {
+ logger.error(
+ "[IoTConsensus] failed to cleanup side effects after failed to add
remote peer", mpe);
+ }
throw new ConsensusException(e.getMessage());
}
}
@@ -335,21 +351,27 @@ public class IoTConsensus implements IConsensus {
throw new PeerNotInConsensusGroupException(groupId, peer.toString());
}
+ KillPoint.setKillPoint(IoTConsensusRemovePeerKillPoints.INIT);
+
try {
// let other peers remove the sync channel with target peer
impl.notifyPeersToRemoveSyncLogChannel(peer);
} catch (ConsensusGroupModifyPeerException e) {
throw new ConsensusException(e.getMessage());
}
+ KillPoint.setKillPoint(
+
IoTConsensusRemovePeerKillPoints.AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL);
try {
// let target peer reject new write
impl.inactivePeer(peer);
+
KillPoint.setKillPoint(IoTConsensusRemovePeerKillPoints.AFTER_INACTIVE_PEER);
// wait its SyncLog to complete
impl.waitTargetPeerUntilSyncLogCompleted(peer);
} catch (ConsensusGroupModifyPeerException e) {
throw new ConsensusException(e.getMessage());
}
+ KillPoint.setKillPoint(IoTConsensusRemovePeerKillPoints.FINISH);
}
@Override
@@ -401,19 +423,19 @@ public class IoTConsensus implements IConsensus {
} else if (!impl.isActive()) {
throw new ConsensusException(
"peer is inactive and not ready to receive reset configuration
request.");
- } else {
- for (Peer peer : impl.getConfiguration()) {
- if (!peers.contains(peer)) {
- try {
- removeRemotePeer(groupId, peer);
- } catch (ConsensusException e) {
- logger.error("Failed to remove peer {} from group {}", peer,
groupId, e);
- throw e;
- }
+ }
+
+ for (Peer peer : impl.getConfiguration()) {
+ if (!peers.contains(peer)) {
+ try {
+ removeRemotePeer(groupId, peer);
+ } catch (ConsensusException e) {
+ logger.error("Failed to remove peer {} from group {}", peer,
groupId, e);
+ throw e;
}
}
- impl.resetConfiguration(peers);
}
+ impl.resetConfiguration(peers);
}
public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index ebee4982058..cdb797da4e4 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -87,6 +87,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
public class IoTConsensusServerImpl {
@@ -133,9 +134,8 @@ public class IoTConsensusServerImpl {
this.configuration = configuration;
if (configuration.isEmpty()) {
recoverConfiguration();
- } else {
- persistConfiguration();
}
+ persistConfiguration();
this.backgroundTaskService = backgroundTaskService;
this.config = config;
this.consensusGroupId = thisNode.getGroupId().toString();
@@ -572,7 +572,7 @@ public class IoTConsensusServerImpl {
configuration.add(targetPeer);
// step 3, persist configuration
logger.info("[IoTConsensus] persist new configuration: {}", configuration);
- persistConfigurationUpdate();
+ persistConfiguration();
}
public void removeSyncLogChannel(Peer targetPeer) throws
ConsensusGroupModifyPeerException {
@@ -584,16 +584,30 @@ public class IoTConsensusServerImpl {
configuration.remove(targetPeer);
checkAndUpdateSafeDeletedSearchIndex();
// step 3, persist configuration
- persistConfigurationUpdate();
+ persistConfiguration();
logger.info("[IoTConsensus] configuration updated to {}",
this.configuration);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException("error when remove
LogDispatcherThread", e);
}
}
+ // TODO: persist first and then delete old configuration file
public void persistConfiguration() {
try {
- serializeConfigurationAndFsyncToDisk(CONFIGURATION_FILE_NAME);
+ try (Stream<Path> stream = Files.walk(Paths.get(storageDir))) {
+ stream
+ .filter(Files::isRegularFile)
+ .filter(filePath ->
filePath.getFileName().toString().contains("configuration"))
+ .forEach(
+ filePath -> {
+ try {
+ Files.delete(filePath);
+ } catch (IOException e) {
+ logger.error("Unexpected error occurs when deleting old
configuration file", e);
+ }
+ });
+ }
+ serializeConfigurationAndFsyncToDisk();
} catch (IOException e) {
// TODO: (xingtanzjr) need to handle the IOException because the
IoTConsensus won't
// work expectedly
@@ -602,16 +616,6 @@ public class IoTConsensusServerImpl {
}
}
- public void persistConfigurationUpdate() throws
ConsensusGroupModifyPeerException {
- try {
- serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
- tmpConfigurationUpdate(configuration);
- } catch (IOException e) {
- throw new ConsensusGroupModifyPeerException(
- "Unexpected error occurs when update configuration", e);
- }
- }
-
public void recoverConfiguration() {
try {
Path tmpConfigurationPath =
@@ -631,9 +635,13 @@ public class IoTConsensusServerImpl {
// recover from split configuration file
Path dirPath = Paths.get(storageDir);
List<Peer> tmpPeerList = getConfiguration(dirPath,
CONFIGURATION_TMP_FILE_NAME);
- tmpConfigurationUpdate(tmpPeerList);
+ configuration.addAll(tmpPeerList);
List<Peer> peerList = getConfiguration(dirPath,
CONFIGURATION_FILE_NAME);
- configuration.addAll(peerList);
+ for (Peer peer : peerList) {
+ if (!configuration.contains(peer)) {
+ configuration.add(peer);
+ }
+ }
}
logger.info("Recover IoTConsensus server Impl, configuration: {}",
configuration);
} catch (IOException e) {
@@ -643,28 +651,18 @@ public class IoTConsensusServerImpl {
// @Compatibility
private void recoverFromOldConfigurationFile(Path oldConfigurationPath)
throws IOException {
+ // recover from old configuration file
ByteBuffer buffer =
ByteBuffer.wrap(Files.readAllBytes(oldConfigurationPath));
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
configuration.add(Peer.deserialize(buffer));
}
- persistConfiguration();
+ // TODO: delete old file before new file persisted is unsafe
Files.delete(oldConfigurationPath);
}
- private void tmpConfigurationUpdate(List<Peer> tmpPeerList) throws
IOException {
- for (Peer peer : tmpPeerList) {
- Path tmpConfigurationPath =
- Paths.get(
- new File(storageDir, peer.getNodeId() + "_" +
CONFIGURATION_TMP_FILE_NAME)
- .getAbsolutePath());
- Path configurationPath =
- Paths.get(
- new File(storageDir, peer.getNodeId() + "_" +
CONFIGURATION_FILE_NAME)
- .getAbsolutePath());
- Files.deleteIfExists(configurationPath);
- Files.move(tmpConfigurationPath, configurationPath);
- }
+ public static String generateConfigurationDatFileName(int nodeId) {
+ return nodeId + "_" + CONFIGURATION_FILE_NAME;
}
private List<Peer> getConfiguration(Path dirPath, String
configurationFileName)
@@ -876,10 +874,9 @@ public class IoTConsensusServerImpl {
return consensusGroupId;
}
- private void serializeConfigurationAndFsyncToDisk(String
configurationFileName)
- throws IOException {
+ private void serializeConfigurationAndFsyncToDisk() throws IOException {
for (Peer peer : configuration) {
- String peerConfigurationFileName = peer.getNodeId() + "_" +
configurationFileName;
+ String peerConfigurationFileName =
generateConfigurationDatFileName(peer.getNodeId());
FileOutputStream fileOutputStream =
new FileOutputStream(new File(storageDir,
peerConfigurationFileName));
try (DataOutputStream outputStream = new
DataOutputStream(fileOutputStream)) {
@@ -888,8 +885,8 @@ public class IoTConsensusServerImpl {
try {
fileOutputStream.flush();
fileOutputStream.getFD().sync();
- } catch (IOException e) {
- logger.error("Failed to fsync the configuration file {}",
peerConfigurationFileName, e);
+ } catch (IOException ignore) {
+ // ignore
}
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 2a80ea3281a..6c4b7cc6777 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.consensus.iot.service;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
@@ -169,6 +171,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
resultHandler.onComplete(new TActivatePeerRes(status));
return;
}
+ KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
impl.setActive(true);
resultHandler.onComplete(
new TActivatePeerRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
@@ -198,6 +201,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
responseStatus.setMessage(e.getMessage());
}
+ KillPoint.setKillPoint(DataNodeKillPoints.ORIGINAL_ADD_PEER_DONE);
resultHandler.onComplete(new TBuildSyncLogChannelRes(responseStatus));
}
@@ -294,6 +298,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Asy
return;
}
impl.loadSnapshot(req.snapshotId);
+ KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION);
resultHandler.onComplete(
new TTriggerSnapshotLoadRes(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1380a167505..27c07ff6bd0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -328,9 +328,14 @@ public class IoTDBConfig {
/** Consensus directory. */
private String consensusDir = IoTDBConstant.DN_DEFAULT_DATA_DIR +
File.separator + "consensus";
- private String dataRegionConsensusDir = consensusDir + File.separator +
"data_region";
+ private String dataRegionConsensusDir =
+ consensusDir + File.separator + IoTDBConstant.DATA_REGION_FOLDER_NAME;
- private String schemaRegionConsensusDir = consensusDir + File.separator +
"schema_region";
+ private String invalidDataRegionConsensusDir =
+ consensusDir + File.separator +
IoTDBConstant.INVALID_DATA_REGION_FOLDER_NAME;
+
+ private String schemaRegionConsensusDir =
+ consensusDir + File.separator + IoTDBConstant.SCHEMA_REGION_FOLDER_NAME;
/** temp result directory for sortOperator */
private String sortTmpDir =
@@ -1445,8 +1450,12 @@ public class IoTDBConfig {
public void setConsensusDir(String consensusDir) {
this.consensusDir = consensusDir;
- setDataRegionConsensusDir(consensusDir + File.separator + "data_region");
- setSchemaRegionConsensusDir(consensusDir + File.separator +
"schema_region");
+ setDataRegionConsensusDir(
+ consensusDir + File.separator + IoTDBConstant.DATA_REGION_FOLDER_NAME);
+ setSchemaRegionConsensusDir(
+ consensusDir + File.separator +
IoTDBConstant.SCHEMA_REGION_FOLDER_NAME);
+ setInvalidDataRegionConsensusDir(
+ consensusDir + File.separator +
IoTDBConstant.INVALID_DATA_REGION_FOLDER_NAME);
}
public String getDataRegionConsensusDir() {
@@ -1457,6 +1466,14 @@ public class IoTDBConfig {
this.dataRegionConsensusDir = dataRegionConsensusDir;
}
+ public String getInvalidDataRegionConsensusDir() {
+ return invalidDataRegionConsensusDir;
+ }
+
+ public void setInvalidDataRegionConsensusDir(String
invalidDataRegionConsensusDir) {
+ this.invalidDataRegionConsensusDir = invalidDataRegionConsensusDir;
+ }
+
public String getSchemaRegionConsensusDir() {
return schemaRegionConsensusDir;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 02ad2844374..1cdd3406574 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.service;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -29,6 +30,7 @@ import
org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -44,6 +46,7 @@ import org.apache.iotdb.commons.udf.UDFInformation;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
@@ -54,6 +57,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.iot.IoTConsensus;
import org.apache.iotdb.db.conf.DataNodeStartupCheck;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -108,6 +112,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -450,6 +457,57 @@ public class DataNode implements DataNodeMBean {
}
}
+ // TODO: Implement in IConsensus, not in DataNode
+ private List<ConsensusGroupId> getConsensusGroupId() {
+ List<ConsensusGroupId> consensusGroupIds = new ArrayList<>();
+ String dataRegionConsensusDir = config.getDataRegionConsensusDir();
+ if
(config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS))
{
+ return consensusGroupIds;
+ }
+ try (DirectoryStream<Path> stream =
+ Files.newDirectoryStream(new File(dataRegionConsensusDir).toPath())) {
+ for (Path path : stream) {
+ String[] items = path.getFileName().toString().split("_");
+ ConsensusGroupId consensusGroupId =
+ ConsensusGroupId.Factory.create(Integer.parseInt(items[0]),
Integer.parseInt(items[1]));
+ consensusGroupIds.add(consensusGroupId);
+ }
+ } catch (IOException e) {
+ logger.error("Cannot get consensus group id from {}",
dataRegionConsensusDir, e);
+ }
+ return consensusGroupIds;
+ }
+
+ // TODO: remove for current version, add todo for rename
+ private void renameInvalidRegionDirs(List<ConsensusGroupId>
invalidConsensusGroupIds) {
+ for (ConsensusGroupId consensusGroupId : invalidConsensusGroupIds) {
+ File oldDir =
+ new File(
+ IoTConsensus.buildPeerDir(
+ new File(config.getDataRegionConsensusDir()),
consensusGroupId));
+ File newDir =
+ new File(
+ IoTConsensus.buildPeerDir(
+ new File(config.getInvalidDataRegionConsensusDir()),
consensusGroupId));
+ if (oldDir.exists() && !FileUtils.moveFileSafe(oldDir, newDir)) {
+ logger.error("move {} to {} failed.", oldDir.getAbsolutePath(),
newDir.getAbsolutePath());
+ try {
+ FileUtils.recursivelyDeleteFolder(oldDir.getPath());
+ } catch (IOException e) {
+ logger.error("delete {} failed.", oldDir.getAbsolutePath());
+ }
+ }
+ }
+ }
+
+ private void removeInvalidRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
+ List<ConsensusGroupId> invalidConsensusGroupIds =
+ getConsensusGroupId().stream()
+ .filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
+ .collect(Collectors.toList());
+ renameInvalidRegionDirs(invalidConsensusGroupIds);
+ }
+
private void sendRestartRequestToConfigNode() throws StartupException {
logger.info("Sending restart request to ConfigNode-leader...");
long startTime = System.currentTimeMillis();
@@ -500,6 +558,14 @@ public class DataNode implements DataNodeMBean {
"Restart request to cluster: {} is accepted, which takes {} ms.",
config.getClusterName(),
(endTime - startTime));
+
+ List<TConsensusGroupId> consensusGroupIds =
dataNodeRestartResp.getConsensusGroupIds();
+ List<ConsensusGroupId> dataNodeConsensusGroupIds =
+ consensusGroupIds.stream()
+ .map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
+ .collect(Collectors.toList());
+
+ removeInvalidRegions(dataNodeConsensusGroupIds);
} else {
/* Throw exception when restart is rejected */
throw new StartupException(dataNodeRestartResp.getStatus().getMessage());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 90610bbcb53..0c9e28ae500 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProper
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.tsfile.fileSystem.FSType;
import org.slf4j.Logger;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
public class CommonConfig {
@@ -256,6 +258,12 @@ public class CommonConfig {
// time in nanosecond precision when starting up
private final long startUpNanosecond = System.nanoTime();
+ private final boolean isIntegrationTest =
+
System.getProperties().containsKey(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS);
+
+ private final Set<String> enabledKillPoints =
+
KillPoint.parseKillPoints(System.getProperty(IoTDBConstant.INTEGRATION_TEST_KILL_POINTS));
+
CommonConfig() {
// Empty constructor
}
@@ -1073,4 +1081,12 @@ public class CommonConfig {
public long getStartUpNanosecond() {
return startUpNanosecond;
}
+
+ public boolean isIntegrationTest() {
+ return isIntegrationTest;
+ }
+
+ public Set<String> getEnabledKillPoints() {
+ return enabledKillPoints;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index dcf059deb74..b6697483f9f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -233,6 +233,9 @@ public class IoTDBConstant {
public static final String UNSEQUENCE_FOLDER_NAME = "unsequence";
public static final String FILE_NAME_SEPARATOR = "-";
public static final String CONSENSUS_FOLDER_NAME = "consensus";
+ public static final String DATA_REGION_FOLDER_NAME = "data_region";
+ public static final String INVALID_DATA_REGION_FOLDER_NAME =
"invalid_data_region";
+ public static final String SCHEMA_REGION_FOLDER_NAME = "schema_region";
public static final String SNAPSHOT_FOLDER_NAME = "snapshot";
// system folder name
@@ -340,4 +343,6 @@ public class IoTDBConstant {
public static final String TIER_SEPARATOR = ";";
public static final String OBJECT_STORAGE_DIR = "object_storage";
+
+ public static final String INTEGRATION_TEST_KILL_POINTS =
"integrationTestKillPoints";
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index 401dfeb2477..f2788381b4a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -37,7 +37,6 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
public class FileUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(FileUtils.class);
@@ -262,15 +261,6 @@ public class FileUtils {
return true;
}
- public static void logBreakpoint(String logContent) {
- LOGGER.info("breakpoint:{}", logContent);
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
public static File createHardLink(File sourceFile, File hardlink) throws
IOException {
if (!hardlink.getParentFile().exists() &&
!hardlink.getParentFile().mkdirs()) {
throw new IOException(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/DataNodeKillPoints.java
similarity index 78%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/DataNodeKillPoints.java
index 9d08cc09b78..1959537aa2c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/DataNodeKillPoints.java
@@ -17,12 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.commons.utils.KillPoint;
-@TestOnly
public enum DataNodeKillPoints {
- OriginalRemovePeer,
- OriginalDeleteOldRegionPeer,
-
- CoordinatorRemovePeer,
+ ORIGINAL_ADD_PEER_DONE,
+ DESTINATION_CREATE_LOCAL_PEER,
+ DESTINATION_ADD_PEER_TRANSITION,
+ DESTINATION_ADD_PEER_DONE,
+ COORDINATOR_ADD_PEER_TRANSITION,
+ COORDINATOR_ADD_PEER_DONE,
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/IoTConsensusRemovePeerKillPoints.java
similarity index 81%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
copy to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/IoTConsensusRemovePeerKillPoints.java
index 9d08cc09b78..4f80aa87212 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/IoTConsensusRemovePeerKillPoints.java
@@ -17,12 +17,11 @@
* under the License.
*/
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.commons.utils.KillPoint;
-@TestOnly
-public enum DataNodeKillPoints {
- OriginalRemovePeer,
- OriginalDeleteOldRegionPeer,
-
- CoordinatorRemovePeer,
+public enum IoTConsensusRemovePeerKillPoints {
+ INIT,
+ AFTER_NOTIFY_PEERS_TO_REMOVE_SYNC_LOG_CHANNEL,
+ AFTER_INACTIVE_PEER,
+ FINISH,
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/KillPoint.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/KillPoint.java
new file mode 100644
index 00000000000..40d06f5d0ad
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/KillPoint.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.utils.KillPoint;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class KillPoint {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KillPoint.class);
+
+ public static String addKillPointPrefix(String name) {
+ return "[KILL POINT] " + name;
+ }
+
+ /**
+ * @param s something like "[a, b, c]"
+ * @return Set{a,b,c}
+ */
+ public static ImmutableSet<String> parseKillPoints(String s) {
+ if (s == null) {
+ return ImmutableSet.of();
+ }
+ Set<String> result =
+ Arrays.stream(s.replace("[", "").replace("]", "").replace(" ",
"").split(","))
+ .collect(Collectors.toSet());
+ LOGGER.info("Kill point set: {}", result);
+ return ImmutableSet.copyOf(result);
+ }
+
+ /**
+ * Call this function to print a line in log.
+ *
+ * @param x Any enum member
+ */
+ public static <T extends Enum<T>> void setKillPoint(T x) {
+ if (CommonDescriptor.getInstance().getConfig().isIntegrationTest()) {
+ String breakPointName = enumToString(x);
+ if (CommonDescriptor.getInstance()
+ .getConfig()
+ .getEnabledKillPoints()
+ .contains(breakPointName)) {
+ LOGGER.info(addKillPointPrefix(breakPointName));
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ public static <T extends Enum<?>> String enumToString(T x) {
+ return x.getClass().getSimpleName() + "." + x.name();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/NeverTriggeredKillPoint.java
similarity index 83%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
rename to
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/NeverTriggeredKillPoint.java
index 9d08cc09b78..b802622f23b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/DataNodeKillPoints.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/KillPoint/NeverTriggeredKillPoint.java
@@ -17,12 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.commons.utils;
+package org.apache.iotdb.commons.utils.KillPoint;
-@TestOnly
-public enum DataNodeKillPoints {
- OriginalRemovePeer,
- OriginalDeleteOldRegionPeer,
-
- CoordinatorRemovePeer,
+public enum NeverTriggeredKillPoint {
+ NEVER_TRIGGERED_KILL_POINT
}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index b32e4acb0f9..ec47654f1ad 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -130,6 +130,7 @@ struct TDataNodeRestartResp {
1: required common.TSStatus status
2: required list<common.TConfigNodeLocation> configNodeList
3: optional TRuntimeConfiguration runtimeConfiguration
+ 4: optional list<common.TConsensusGroupId> consensusGroupIds
}
struct TDataNodeRemoveReq {