This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 512f2ffd54 [IOTDB-3039] Add standalone recovery (#5731)
512f2ffd54 is described below

commit 512f2ffd540cb7da7ead8bbbf6ef355f133119fc
Author: SzyWilliam <[email protected]>
AuthorDate: Fri Apr 29 23:30:23 2022 +0800

    [IOTDB-3039] Add standalone recovery (#5731)
    
    * add standalone recovery
    
    * add standalone recovery
    
    * add standalone recovery
    
    * add standalone recovery
    
    * add standalone recovery
    
    * add standalone recovery
    
    * add standalone recovery
---
 .../consensus/standalone/StandAloneConsensus.java  | 50 +++++++++++-
 .../iotdb/consensus/standalone/RecoveryTest.java   | 94 ++++++++++++++++++++++
 .../standalone/StandAloneConsensusTest.java        |  4 +-
 3 files changed, 146 insertions(+), 2 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
index 4650db4c16..f0d15221f2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.standalone;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -37,6 +38,9 @@ import 
org.apache.iotdb.consensus.statemachine.IStateMachine.Registry;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -66,7 +70,26 @@ class StandAloneConsensus implements IConsensus {
   }
 
   @Override
-  public void start() throws IOException {}
+  public void start() throws IOException {
+    if (!this.storageDir.exists()) {
+      storageDir.mkdirs();
+    } else {
+      try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
+        for (Path path : stream) {
+          String filename = path.getFileName().toString();
+          String[] items = filename.split("_");
+          TConsensusGroupType type = TConsensusGroupType.valueOf(items[0]);
+          ConsensusGroupId consensusGroupId = 
ConsensusGroupId.Factory.createEmpty(type);
+          consensusGroupId.setId(Integer.parseInt(items[1]));
+          TEndPoint endPoint = new TEndPoint(items[2], 
Integer.parseInt(items[3]));
+          stateMachineMap.put(
+              consensusGroupId,
+              new StandAloneServerImpl(
+                  new Peer(consensusGroupId, endPoint), 
registry.apply(consensusGroupId)));
+        }
+      }
+    }
+  }
 
   @Override
   public void stop() throws IOException {}
@@ -125,6 +148,19 @@ class StandAloneConsensus implements IConsensus {
           StandAloneServerImpl impl =
               new StandAloneServerImpl(peers.get(0), registry.apply(groupId));
           impl.start();
+          String groupPath =
+              storageDir
+                  + File.separator
+                  + groupId.getType()
+                  + "_"
+                  + groupId.getId()
+                  + "_"
+                  + peers.get(0).getEndpoint().ip
+                  + "_"
+                  + peers.get(0).getEndpoint().port;
+          File file = new File(groupPath);
+          file.mkdirs();
+
           return impl;
         });
     if (exist.get()) {
@@ -141,6 +177,18 @@ class StandAloneConsensus implements IConsensus {
     stateMachineMap.computeIfPresent(
         groupId,
         (k, v) -> {
+          String groupPath =
+              storageDir
+                  + File.separator
+                  + groupId.getType()
+                  + "_"
+                  + groupId.getId()
+                  + "_"
+                  + thisNode.ip
+                  + "_"
+                  + thisNode.port;
+          File file = new File(groupPath);
+          file.delete();
           exist.set(true);
           v.stop();
           return null;
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
new file mode 100644
index 0000000000..0c9f0e74f6
--- /dev/null
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/RecoveryTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.standalone;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
+
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+public class RecoveryTest {
+  private final ConsensusGroupId schemaRegionId = new SchemaRegionId(1);
+  private IConsensus consensusImpl;
+  private static final String STANDALONE_CONSENSUS_CLASS_NAME =
+      "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
+
+  public void constructConsensus() throws IOException {
+    consensusImpl =
+        ConsensusFactory.getConsensusImpl(
+                STANDALONE_CONSENSUS_CLASS_NAME,
+                new TEndPoint("localhost", 9000),
+                new File("./target/recovery"),
+                gid -> new EmptyStateMachine())
+            .orElseThrow(
+                () ->
+                    new IllegalArgumentException(
+                        String.format(
+                            ConsensusFactory.CONSTRUCT_FAILED_MSG,
+                            STANDALONE_CONSENSUS_CLASS_NAME)));
+    consensusImpl.start();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    constructConsensus();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    consensusImpl.stop();
+    FileUtils.deleteFully(new File("./target/recovery"));
+  }
+
+  @Test
+  public void recoveryTest() throws Exception {
+    consensusImpl.addConsensusGroup(
+        schemaRegionId,
+        Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 9000))));
+
+    consensusImpl.stop();
+    consensusImpl = null;
+
+    constructConsensus();
+
+    ConsensusGenericResponse response =
+        consensusImpl.addConsensusGroup(
+            schemaRegionId,
+            Collections.singletonList(new Peer(schemaRegionId, new 
TEndPoint("0.0.0.0", 9000))));
+
+    Assert.assertEquals(
+        response.getException().getMessage(),
+        new ConsensusGroupAlreadyExistException(schemaRegionId).getMessage());
+  }
+}
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 0275868843..986e347356 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.consensus.exception.IllegalPeerNumException;
 import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
+import org.apache.ratis.util.FileUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -134,7 +135,7 @@ public class StandAloneConsensusTest {
         ConsensusFactory.getConsensusImpl(
                 STANDALONE_CONSENSUS_CLASS_NAME,
                 new TEndPoint("localhost", 6667),
-                new File("./"),
+                new File("./target/standalone"),
                 gid -> {
                   switch (gid.getType()) {
                     case SchemaRegion:
@@ -156,6 +157,7 @@ public class StandAloneConsensusTest {
   @After
   public void tearDown() throws Exception {
     consensusImpl.stop();
+    FileUtils.deleteFully(new File("./target/standalone"));
   }
 
   @Test

Reply via email to