This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cde99e33a4 [IOTDB-3253] Upgrade dependency: Ratis from 2.2.0 to 2.3.0 
(#5991)
cde99e33a4 is described below

commit cde99e33a4242b0707e810c69e755b8d9724788e
Author: SzyWilliam <[email protected]>
AuthorDate: Wed May 25 13:25:30 2022 +0800

    [IOTDB-3253] Upgrade dependency: Ratis from 2.2.0 to 2.3.0 (#5991)
    
    * upgrade ratis 2.2.0 to 2.3.0
    
    * spotless
    
    * CI bug fix
    
    * CI bug fix
    
    * disable useCache option
    
    * spotless
    
    * fix reviews
---
 consensus/pom.xml                                  |  2 +-
 .../ratis/ApplicationStateMachineProxy.java        | 19 ++++-
 .../iotdb/consensus/ratis/RatisConsensus.java      |  4 +
 .../iotdb/consensus/ratis/SnapshotStorage.java     | 87 ++++++++++++++++++++--
 .../org/apache/iotdb/consensus/ratis/Utils.java    |  7 +-
 .../apache/iotdb/consensus/ratis/SnapshotTest.java | 12 ++-
 6 files changed, 116 insertions(+), 15 deletions(-)

diff --git a/consensus/pom.xml b/consensus/pom.xml
index 7284a5fee7..e4d873bb19 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -56,6 +56,6 @@
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
-        <ratis.version>2.2.0</ratis.version>
+        <ratis.version>2.3.0</ratis.version>
     </properties>
 </project>
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index ab0f65fa38..e6467401a0 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -86,8 +86,10 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
 
   @Override
   public void pause() {
-    getLifeCycle().transition(LifeCycle.State.PAUSING);
-    getLifeCycle().transition(LifeCycle.State.PAUSED);
+    if (getLifeCycleState() == LifeCycle.State.RUNNING) {
+      getLifeCycle().transition(LifeCycle.State.PAUSING);
+      getLifeCycle().transition(LifeCycle.State.PAUSED);
+    }
   }
 
   @Override
@@ -142,13 +144,22 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
     // require the application statemachine to take the latest snapshot
     String metadata = Utils.getMetadataFromTermIndex(lastApplied);
     File snapshotDir = snapshotStorage.getSnapshotDir(metadata);
+
+    // delete snapshotDir fully in case of last takeSnapshot() crashed
+    FileUtils.deleteFully(snapshotDir);
+
     snapshotDir.mkdir();
     if (!snapshotDir.isDirectory()) {
       logger.error("Unable to create snapshotDir at {}", snapshotDir);
       return RaftLog.INVALID_LOG_INDEX;
     }
-    boolean success = applicationStateMachine.takeSnapshot(snapshotDir);
-    if (!success) {
+
+    boolean applicationTakeSnapshotSuccess = 
applicationStateMachine.takeSnapshot(snapshotDir);
+    boolean addTermIndexMetafileSuccess =
+        snapshotStorage.addTermIndexMetaFile(snapshotDir, metadata);
+
+    if (!applicationTakeSnapshotSuccess || !addTermIndexMetafileSuccess) {
+      // this takeSnapshot failed, clean up files and directories
       // statemachine is supposed to clear snapshotDir on failure
       boolean isEmpty = snapshotDir.delete();
       if (!isEmpty) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 74916fba41..03bca95863 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -116,11 +116,15 @@ class RatisConsensus implements IConsensus {
    */
   public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, 
IStateMachine.Registry registry)
       throws IOException {
+    System.setProperty(
+        
"org.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads", 
"false");
     String address = Utils.IPAddress(endpoint);
     myself = Utils.fromTEndPointAndPriorityToRaftPeer(endpoint, 
DEFAULT_PRIORITY);
 
     RaftServerConfigKeys.setStorageDir(properties, 
Collections.singletonList(ratisStorageDir));
     RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
+    // TODO make this configurable so that RatisConsensusTest can trigger 
multiple snapshot process
+    // RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 20);
     RaftServerConfigKeys.Rpc.setSlownessTimeout(
         properties, TimeDuration.valueOf(10, TimeUnit.MINUTES));
     RaftServerConfigKeys.Rpc.setTimeoutMin(properties, TimeDuration.valueOf(2, 
TimeUnit.SECONDS));
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 8d6d2224a3..14d3d64590 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -43,17 +43,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
-/**
- * TODO: Warning, currently in Ratis 2.2.0, there is a bug in installSnapshot. 
In subsequent
- * installSnapshot, a follower may fail to install while the leader assume it 
success. This bug will
- * be triggered when the snapshot threshold is low. This is fixed in current 
Ratis Master, and
- * hopefully will be introduced in Ratis 2.3.0.
- */
 public class SnapshotStorage implements StateMachineStorage {
+  private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
   private final IStateMachine applicationStateMachine;
+  private final String META_FILE_PREFIX = ".ratis_meta.";
 
   private File stateMachineDir;
-  private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
 
   public SnapshotStorage(IStateMachine applicationStateMachine) {
     this.applicationStateMachine = applicationStateMachine;
@@ -87,6 +82,7 @@ public class SnapshotStorage implements StateMachineStorage {
   }
 
   public File findLatestSnapshotDir() {
+    moveSnapshotFileToSubDirectory();
     Path[] snapshots = getSortedSnapshotDirPaths();
     if (snapshots == null || snapshots.length == 0) {
       return null;
@@ -141,4 +137,81 @@ public class SnapshotStorage implements 
StateMachineStorage {
   public File getSnapshotDir(String snapshotMetadata) {
     return new File(stateMachineDir.getAbsolutePath() + File.separator + 
snapshotMetadata);
   }
+
+  /**
+   * Currently, we name the snapshotDir with Term_Index so that we can tell 
which directory contains
+   * the latest snapshot files. Unfortunately, when leader install snapshot to 
a slow follower,
+   * current Ratis implementation will flatten the directory and place all the 
snapshots directly
+   * under statemachine dir. Under this scenario, we cannot restore Term_Index 
from directory name.
+   * We decided to add an empty metadata file containing only Term_Index into 
the snapshotDir. his
+   * metadata file will be installed along with application snapshot files, so 
that Term_Index
+   * information is kept during InstallSnapshot.
+   */
+  public boolean addTermIndexMetaFile(File snapshotDir, String 
termIndexMetadata) {
+    File snapshotMetaFile = new File(getMetafilePath(snapshotDir, 
termIndexMetadata));
+    try {
+      return snapshotMetaFile.createNewFile();
+    } catch (IOException e) {
+      logger.warn("cannot create snapshot metafile: ", e);
+      return false;
+    }
+  }
+
+  private String getMetafilePath(File snapshotDir, String termIndexMetadata) {
+    // e.g. /_sm/3_39/.ratis_meta.3_39
+    return snapshotDir.getAbsolutePath() + File.separator + META_FILE_PREFIX + 
termIndexMetadata;
+  }
+
+  private String getMetafileMatcherRegex() {
+    // meta file should always end with term_index
+    return META_FILE_PREFIX + "\\d+_\\d+$";
+  }
+
+  /**
+   * After leader InstallSnapshot to a slow follower, Ratis will put all 
snapshot files directly
+   * under statemachineDir. We need to handle this special scenario and 
rearrange these files to
+   * appropriate sub-directory this function will move all snapshot files 
directly under /sm to
+   * /sm/term_index/
+   */
+  void moveSnapshotFileToSubDirectory() {
+    File[] potentialMetafile =
+        stateMachineDir.listFiles((dir, name) -> 
name.matches(getMetafileMatcherRegex()));
+    if (potentialMetafile == null || potentialMetafile.length == 0) {
+      // the statemachine dir contains no direct metafile
+      return;
+    }
+    String metadata = 
potentialMetafile[0].getName().substring(META_FILE_PREFIX.length());
+
+    File snapshotDir = getSnapshotDir(metadata);
+    snapshotDir.mkdir();
+
+    File[] snapshotFiles = stateMachineDir.listFiles();
+
+    // move files to snapshotDir, if an error occurred, delete snapshotDir
+    try {
+      if (snapshotFiles == null) {
+        logger.error(
+            "An unexpected condition triggered. please check implementation "
+                + this.getClass().getName());
+        FileUtils.deleteFully(snapshotDir);
+        return;
+      }
+
+      for (File file : snapshotFiles) {
+        boolean success = file.renameTo(new File(snapshotDir + File.separator 
+ file.getName()));
+        if (!success) {
+          logger.warn(
+              "move snapshot file "
+                  + file.getAbsolutePath()
+                  + " to sub-directory "
+                  + snapshotDir.getAbsolutePath()
+                  + "failed");
+          FileUtils.deleteFully(snapshotDir);
+          break;
+        }
+      }
+    } catch (IOException e) {
+      logger.warn("delete directory failed: ", e);
+    }
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index e1773c7f80..48f8598230 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -141,8 +141,11 @@ public class Utils {
   }
 
   public static TermIndex getTermIndexFromDir(File snapshotDir) {
-    String ordinal = snapshotDir.getName();
-    String[] items = ordinal.split("_");
+    return getTermIndexFromMetadataString(snapshotDir.getName());
+  }
+
+  public static TermIndex getTermIndexFromMetadataString(String metadata) {
+    String[] items = metadata.split("_");
     return TermIndex.valueOf(Long.parseLong(items[0]), 
Long.parseLong(items[1]));
   }
 }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index e42f981706..9a741a5332 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -97,6 +97,7 @@ public class SnapshotTest {
     long index = proxy.takeSnapshot();
     Assert.assertEquals(index, 616);
     Assert.assertTrue(new File(snapshotFilename).exists());
+    Assert.assertTrue(new File(getSnapshotMetaFilename("421_616")).exists());
 
     // take a snapshot at 616-4217
     proxy.notifyTermIndexUpdated(616, 4217);
@@ -105,16 +106,25 @@ public class SnapshotTest {
     long indexLatest = proxy.takeSnapshot();
     Assert.assertEquals(indexLatest, 4217);
     Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+    Assert.assertTrue(new File(getSnapshotMetaFilename("616_4217")).exists());
 
     // query the latest snapshot
     SnapshotInfo info = proxy.getLatestSnapshot();
     Assert.assertEquals(info.getTerm(), 616);
     Assert.assertEquals(info.getIndex(), 4217);
-    
Assert.assertTrue(info.getFiles().get(0).getPath().endsWith(snapshotFilenameLatest));
 
     // clean up
     proxy.getStateMachineStorage().cleanupOldSnapshots(null);
     Assert.assertFalse(new File(snapshotFilename).exists());
     Assert.assertTrue(new File(snapshotFilenameLatest).exists());
   }
+
+  private String getSnapshotMetaFilename(String termIndexMeta) {
+    return testDir.getAbsolutePath()
+        + File.separator
+        + termIndexMeta
+        + File.separator
+        + ".ratis_meta."
+        + termIndexMeta;
+  }
 }

Reply via email to