This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 512f2ffd54 [IOTDB-3039] Add standalone recovery (#5731)
512f2ffd54 is described below
commit 512f2ffd540cb7da7ead8bbbf6ef355f133119fc
Author: SzyWilliam <[email protected]>
AuthorDate: Fri Apr 29 23:30:23 2022 +0800
[IOTDB-3039] Add standalone recovery (#5731)
* add standalone recovery
* add standalone recovery
* add standalone recovery
* add standalone recovery
* add standalone recovery
* add standalone recovery
* add standalone recovery
---
.../consensus/standalone/StandAloneConsensus.java | 50 +++++++++++-
.../iotdb/consensus/standalone/RecoveryTest.java | 94 ++++++++++++++++++++++
.../standalone/StandAloneConsensusTest.java | 4 +-
3 files changed, 146 insertions(+), 2 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 4650db4c16..f0d15221f2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.standalone;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -37,6 +38,9 @@ import
org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
import java.io.File;
import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -66,7 +70,26 @@ class StandAloneConsensus implements IConsensus {
}
@Override
- public void start() throws IOException {}
+ public void start() throws IOException {
+ if (!this.storageDir.exists()) {
+ storageDir.mkdirs();
+ } else {
+ try (DirectoryStream<Path> stream =
Files.newDirectoryStream(storageDir.toPath())) {
+ for (Path path : stream) {
+ String filename = path.getFileName().toString();
+ String[] items = filename.split("_");
+ TConsensusGroupType type = TConsensusGroupType.valueOf(items[0]);
+ ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.createEmpty(type);
+ consensusGroupId.setId(Integer.parseInt(items[1]));
+ TEndPoint endPoint = new TEndPoint(items[2],
Integer.parseInt(items[3]));
+ stateMachineMap.put(
+ consensusGroupId,
+ new StandAloneServerImpl(
+ new Peer(consensusGroupId, endPoint),
registry.apply(consensusGroupId)));
+ }
+ }
+ }
+ }
@Override
public void stop() throws IOException {}
@@ -125,6 +148,19 @@ class StandAloneConsensus implements IConsensus {
StandAloneServerImpl impl =
new StandAloneServerImpl(peers.get(0), registry.apply(groupId));
impl.start();
+ String groupPath =
+ storageDir
+ + File.separator
+ + groupId.getType()
+ + "_"
+ + groupId.getId()
+ + "_"
+ + peers.get(0).getEndpoint().ip
+ + "_"
+ + peers.get(0).getEndpoint().port;
+ File file = new File(groupPath);
+ file.mkdirs();
+
return impl;
});
if (exist.get()) {
@@ -141,6 +177,18 @@ class StandAloneConsensus implements IConsensus {
stateMachineMap.computeIfPresent(
groupId,
(k, v) -> {
+ String groupPath =
+ storageDir
+ + File.separator
+ + groupId.getType()
+ + "_"
+ + groupId.getId()
+ + "_"
+ + thisNode.ip
+ + "_"
+ + thisNode.port;
+ File file = new File(groupPath);
+ file.delete();
exist.set(true);
v.stop();
return null;
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
new file mode 100644
index 0000000000..0c9f0e74f6
--- /dev/null
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.standalone;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
+
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+public class RecoveryTest {
+ private final ConsensusGroupId schemaRegionId = new SchemaRegionId(1);
+ private IConsensus consensusImpl;
+ private static final String STANDALONE_CONSENSUS_CLASS_NAME =
+ "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
+
+ public void constructConsensus() throws IOException {
+ consensusImpl =
+ ConsensusFactory.getConsensusImpl(
+ STANDALONE_CONSENSUS_CLASS_NAME,
+ new TEndPoint("localhost", 9000),
+ new File("./target/recovery"),
+ gid -> new EmptyStateMachine())
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ ConsensusFactory.CONSTRUCT_FAILED_MSG,
+ STANDALONE_CONSENSUS_CLASS_NAME)));
+ consensusImpl.start();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ constructConsensus();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ consensusImpl.stop();
+ FileUtils.deleteFully(new File("./target/recovery"));
+ }
+
+ @Test
+ public void recoveryTest() throws Exception {
+ consensusImpl.addConsensusGroup(
+ schemaRegionId,
+ Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 9000))));
+
+ consensusImpl.stop();
+ consensusImpl = null;
+
+ constructConsensus();
+
+ ConsensusGenericResponse response =
+ consensusImpl.addConsensusGroup(
+ schemaRegionId,
+ Collections.singletonList(new Peer(schemaRegionId, new
TEndPoint("0.0.0.0", 9000))));
+
+ Assert.assertEquals(
+ response.getException().getMessage(),
+ new ConsensusGroupAlreadyExistException(schemaRegionId).getMessage());
+ }
+}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 0275868843..986e347356 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.consensus.exception.IllegalPeerNumException;
import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.ratis.util.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -134,7 +135,7 @@ public class StandAloneConsensusTest {
ConsensusFactory.getConsensusImpl(
STANDALONE_CONSENSUS_CLASS_NAME,
new TEndPoint("localhost", 6667),
- new File("./"),
+ new File("./target/standalone"),
gid -> {
switch (gid.getType()) {
case SchemaRegion:
@@ -156,6 +157,7 @@ public class StandAloneConsensusTest {
@After
public void tearDown() throws Exception {
consensusImpl.stop();
+ FileUtils.deleteFully(new File("./target/standalone"));
}
@Test