This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6bd6ff634b [IOTDB-4549] Add ConfigNode snapshot integration test
(#7466)
6bd6ff634b is described below
commit 6bd6ff634bc428b18c31b0573f5f069081beff52
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Sep 29 08:31:54 2022 +0800
[IOTDB-4549] Add ConfigNode snapshot integration test (#7466)
---
.../iotdb/consensus/ratis/RatisConsensus.java | 2 +
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 1 +
.../java/org/apache/iotdb/it/env/Cluster1Env.java | 6 +
.../java/org/apache/iotdb/it/env/MppConfig.java | 7 +
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 5 +
.../apache/iotdb/it/env/StandaloneOnMppEnv.java | 5 +
.../org/apache/iotdb/itbase/env/BaseConfig.java | 8 +
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 2 +
.../iotdb/confignode/IoTDBClusterPartitionIT.java | 19 ++-
.../confignode/IoTDBConfigNodeSnapshotIT.java | 186 +++++++++++++++++++++
.../org/apache/iotdb/db/it/env/StandaloneEnv.java | 5 +
11 files changed, 244 insertions(+), 2 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 487bfff9a7..61e61dd9e4 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -648,10 +648,12 @@ class RatisConsensus implements IConsensus {
}
private ConsensusWriteResponse failedWrite(ConsensusException e) {
+ logger.error("write request failed with exception", e);
return ConsensusWriteResponse.newBuilder().setException(e).build();
}
private ConsensusReadResponse failedRead(ConsensusException e) {
+ logger.error("read request failed with exception", e);
return ConsensusReadResponse.newBuilder().setException(e).build();
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index b68c0043a3..b1dc1bdb30 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -380,6 +380,7 @@ public abstract class AbstractEnv implements BaseEnv {
new
DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
for (int i = 0; i < 30; i++) {
try {
+ // Return ConfigNode connection of the Seed-ConfigNode
return clientManager.borrowClient(
new TEndPoint(
configNodeWrapperList.get(0).getIp(),
configNodeWrapperList.get(0).getPort()));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/Cluster1Env.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/Cluster1Env.java
index d409bb79d5..978581cbe0 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/Cluster1Env.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/Cluster1Env.java
@@ -31,6 +31,12 @@ public class Cluster1Env extends AbstractEnv {
super.initEnvironment(1, 3);
}
+ @Override
+ public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
+ logger.debug("=======start init cluster environment=======");
+ super.initEnvironment(configNodesNum, dataNodesNum);
+ }
+
@Override
public void initBeforeTest() throws InterruptedException {
logger.debug("=======start init test=======");
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index 1c6f257fc5..dba61b2ddb 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -254,6 +254,13 @@ public class MppConfig implements BaseConfig {
return this;
}
+ @Override
+ public BaseConfig setRatisSnapshotTriggerThreshold(int
ratisSnapshotTriggerThreshold) {
+ confignodeProperties.setProperty(
+ "ratis_snapshot_trigger_threshold",
String.valueOf(ratisSnapshotTriggerThreshold));
+ return this;
+ }
+
@Override
public BaseConfig setConcurrentCompactionThread(int
concurrentCompactionThread) {
confignodeProperties.setProperty(
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index a85905ba0f..7d81391010 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -54,6 +54,11 @@ public class RemoteServerEnv implements BaseEnv {
}
}
+ @Override
+ public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
+ // Do nothing
+ }
+
@Override
public void cleanAfterClass() {}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java
index a765c675d6..d770a5b464 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/StandaloneOnMppEnv.java
@@ -48,6 +48,11 @@ public class StandaloneOnMppEnv extends AbstractEnv {
initEnvironment();
}
+ @Override
+ public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
+ // Do nothing
+ }
+
@Override
public void initBeforeTest() {
logger.debug("=======start init test=======");
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index c1273b3bd2..bbd5e5a753 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -255,6 +255,14 @@ public interface BaseConfig {
return 86400;
}
+ default BaseConfig setRatisSnapshotTriggerThreshold(int
ratisSnapshotTriggerThreshold) {
+ return this;
+ }
+
+ default int getRatisSnapshotTriggerThreshold() {
+ return 400000;
+ }
+
default BaseConfig setConcurrentCompactionThread(int
concurrentCompactionThread) {
return this;
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 79550d2568..0f022ca5dd 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -38,6 +38,8 @@ public interface BaseEnv {
void initBeforeClass() throws InterruptedException;
+ void initClusterEnvironment(int configNodesNum, int dataNodesNum);
+
void cleanAfterClass();
void initBeforeTest() throws InterruptedException;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
index fff68f9bba..d64d20cfcc 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
@@ -284,7 +284,7 @@ public class IoTDBClusterPartitionIT {
}
@Test
- public void testGetAndCreateDataPartition() throws TException, IOException {
+ public void testGetAndCreateDataPartition() throws TException, IOException,
InterruptedException {
final int seriesPartitionBatchSize = 100;
final int timePartitionBatchSize = 10;
@@ -324,7 +324,22 @@ public class IoTDBClusterPartitionIT {
// Test getOrCreateDataPartition, ConfigNode should create
DataPartition and return
dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
- dataPartitionTableResp =
client.getOrCreateDataPartitionTable(dataPartitionReq);
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getConfigNodeConnection()) {
+ dataPartitionTableResp =
+
configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
dataPartitionTableResp.getStatus().getCode());
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
new file mode 100644
index 0000000000..e9fba69f87
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeSnapshotIT.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+import org.apache.iotdb.it.env.ConfigFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBConfigNodeSnapshotIT {
+
+ protected static String originalConfigNodeConsensusProtocolClass;
+ private static final String testConfigNodeConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+ protected static int originalRatisSnapshotTriggerThreshold;
+ private static final int testRatisSnapshotTriggerThreshold = 100;
+
+ protected static long originalTimePartitionInterval;
+ private static final long testTimePartitionInterval = 86400;
+
+ @Before
+ public void setUp() throws Exception {
+ originalConfigNodeConsensusProtocolClass =
+ ConfigFactory.getConfig().getConfigNodeConsesusProtocolClass();
+ ConfigFactory.getConfig()
+
.setConfigNodeConsesusProtocolClass(testConfigNodeConsensusProtocolClass);
+
+ originalRatisSnapshotTriggerThreshold =
+ ConfigFactory.getConfig().getRatisSnapshotTriggerThreshold();
+
ConfigFactory.getConfig().setRatisSnapshotTriggerThreshold(testRatisSnapshotTriggerThreshold);
+
+ originalTimePartitionInterval =
ConfigFactory.getConfig().getTimePartitionInterval();
+
ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
+
+ // Init 3C3D cluster environment
+ EnvFactory.getEnv().initClusterEnvironment(3, 3);
+ }
+
+ @After
+ public void tearDown() {
+ EnvFactory.getEnv().cleanAfterClass();
+
+ ConfigFactory.getConfig()
+
.setConfigNodeConsesusProtocolClass(originalConfigNodeConsensusProtocolClass);
+ ConfigFactory.getConfig()
+
.setRatisSnapshotTriggerThreshold(originalRatisSnapshotTriggerThreshold);
+
ConfigFactory.getConfig().setTimePartitionInterval(originalTimePartitionInterval);
+ }
+
+ private ByteBuffer generatePatternTreeBuffer(String path)
+ throws IllegalPathException, IOException {
+ PathPatternTree patternTree = new PathPatternTree();
+ patternTree.appendPathPattern(new PartialPath(path));
+ patternTree.constructTree();
+
+ PublicBAOS baos = new PublicBAOS();
+ patternTree.serialize(baos);
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+
+ @Test
+ public void testPartitionInfoSnapshot() throws IOException,
IllegalPathException, TException {
+ final String sg = "root.sg";
+ final int storageGroupNum = 10;
+ final int seriesPartitionSlotsNum = 100;
+ final int timePartitionSlotsNum = 10;
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getConfigNodeConnection()) {
+
+ for (int i = 0; i < storageGroupNum; i++) {
+ String storageGroup = sg + i;
+ TSetStorageGroupReq setStorageGroupReq =
+ new TSetStorageGroupReq(new TStorageGroupSchema(storageGroup));
+ TSStatus status = client.setStorageGroup(setStorageGroupReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+
+ for (int j = 0; j < seriesPartitionSlotsNum; j++) {
+ TSeriesPartitionSlot seriesPartitionSlot = new
TSeriesPartitionSlot(j);
+
+ // Create SchemaPartition
+ ByteBuffer patternTree = generatePatternTreeBuffer(storageGroup +
".d" + j + ".s");
+ TSchemaPartitionReq schemaPartitionReq = new
TSchemaPartitionReq(patternTree);
+ TSchemaPartitionTableResp schemaPartitionTableResp =
+ client.getOrCreateSchemaPartitionTable(schemaPartitionReq);
+ // All requests should success if snapshot success
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ schemaPartitionTableResp.getStatus().getCode());
+
Assert.assertNotNull(schemaPartitionTableResp.getSchemaPartitionTable());
+ Assert.assertEquals(1,
schemaPartitionTableResp.getSchemaPartitionTableSize());
+ Assert.assertNotNull(
+
schemaPartitionTableResp.getSchemaPartitionTable().get(storageGroup));
+ Assert.assertEquals(
+ 1,
schemaPartitionTableResp.getSchemaPartitionTable().get(storageGroup).size());
+
+ for (int k = 0; k < timePartitionSlotsNum; k++) {
+ TTimePartitionSlot timePartitionSlot =
+ new TTimePartitionSlot(testTimePartitionInterval * k);
+
+ // Create DataPartition
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
partitionSlotsMap =
+ new HashMap<>();
+ partitionSlotsMap.put(storageGroup, new HashMap<>());
+ partitionSlotsMap
+ .get(storageGroup)
+ .put(seriesPartitionSlot,
Collections.singletonList(timePartitionSlot));
+ TDataPartitionReq dataPartitionReq = new
TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp =
+ client.getOrCreateDataPartitionTable(dataPartitionReq);
+ // All requests should success if snapshot success
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ Assert.assertEquals(1,
dataPartitionTableResp.getDataPartitionTableSize());
+
Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable().get(storageGroup));
+ Assert.assertEquals(
+ 1,
dataPartitionTableResp.getDataPartitionTable().get(storageGroup).size());
+ Assert.assertNotNull(
+ dataPartitionTableResp
+ .getDataPartitionTable()
+ .get(storageGroup)
+ .get(seriesPartitionSlot));
+ Assert.assertEquals(
+ 1,
+ dataPartitionTableResp
+ .getDataPartitionTable()
+ .get(storageGroup)
+ .get(seriesPartitionSlot)
+ .size());
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 4c45167ff0..2a2123ccba 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -49,6 +49,11 @@ public class StandaloneEnv implements BaseEnv {
EnvironmentUtils.envSetUp();
}
+ @Override
+ public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
+ // Do nothing
+ }
+
@Override
public void cleanAfterClass() {
cleanAfterTest();