This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 5b8d10a2a2 [To rel/1.0][IOTDB-5345] Use the logical clock to identify 
the snapshot version of IoTConsensus (#8850)
5b8d10a2a2 is described below

commit 5b8d10a2a21acfc4bb82abd5240638af0d74fcc3
Author: BUAAserein <[email protected]>
AuthorDate: Thu Jan 12 17:31:15 2023 +0800

    [To rel/1.0][IOTDB-5345] Use the logical clock to identify the snapshot 
version of IoTConsensus (#8850)
    
    * IOTDB-5345:Use the logical clock to identify the snapshot version of 
IoTConsensus
---
 .../consensus/iot/IoTConsensusServerImpl.java      | 54 +++++++++++++--------
 .../apache/iotdb/consensus/iot/StabilityTest.java  | 55 ++++++++++++++++++++--
 2 files changed, 84 insertions(+), 25 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..4b19f64093 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -74,15 +74,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
 
 public class IoTConsensusServerImpl {
 
@@ -103,7 +102,8 @@ public class IoTConsensusServerImpl {
   private final IoTConsensusConfig config;
   private final ConsensusReqReader reader;
   private volatile boolean active;
-  private String latestSnapshotId;
+  private String newSnapshotDirName;
+  private static final Pattern snapshotIndexPatten = 
Pattern.compile(".*[^\\d](?=(\\d+))");
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
   private final IoTConsensusServerMetrics metrics;
 
@@ -287,14 +287,11 @@ public class IoTConsensusServerImpl {
 
   public void takeSnapshot() throws ConsensusGroupModifyPeerException {
     try {
-      // TODO: We should use logic clock such as searchIndex rather than wall 
clock to mark the
-      // snapshot, otherwise there will be bugs in situations where the clock 
might fall back, such
-      // as CI
-      latestSnapshotId =
+      long newSnapshotIndex = getLatestSnapshotIndex() + 1;
+      newSnapshotDirName =
           String.format(
-              "%s_%s_%d",
-              SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), 
System.currentTimeMillis());
-      File snapshotDir = new File(storageDir, latestSnapshotId);
+              "%s_%s_%d", SNAPSHOT_DIR_NAME, thisNode.getGroupId().getId(), 
newSnapshotIndex);
+      File snapshotDir = new File(storageDir, newSnapshotDirName);
       if (snapshotDir.exists()) {
         FileUtils.deleteDirectory(snapshotDir);
       }
@@ -312,13 +309,13 @@ public class IoTConsensusServerImpl {
   }
 
   public void transitSnapshot(Peer targetPeer) throws 
ConsensusGroupModifyPeerException {
-    File snapshotDir = new File(storageDir, latestSnapshotId);
+    File snapshotDir = new File(storageDir, newSnapshotDirName);
     List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
     logger.info("transit snapshots: {}", snapshotPaths);
     try (SyncIoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       for (Path path : snapshotPaths) {
-        SnapshotFragmentReader reader = new 
SnapshotFragmentReader(latestSnapshotId, path);
+        SnapshotFragmentReader reader = new 
SnapshotFragmentReader(newSnapshotDirName, path);
         try {
           while (reader.hasNext()) {
             TSendSnapshotFragmentReq req = 
reader.next().toTSendSnapshotFragmentReq();
@@ -370,6 +367,22 @@ public class IoTConsensusServerImpl {
     return originalFilePath.substring(originalFilePath.indexOf(snapshotId));
   }
 
+  private long getLatestSnapshotIndex() {
+    long snapShotIndex = 0;
+    File directory = new File(storageDir);
+    File[] versionFiles = directory.listFiles((dir, name) -> 
name.startsWith(SNAPSHOT_DIR_NAME));
+    if (versionFiles == null || versionFiles.length == 0) {
+      return snapShotIndex;
+    }
+    for (File file : versionFiles) {
+      snapShotIndex =
+          Long.max(
+              snapShotIndex,
+              
Long.parseLong(snapshotIndexPatten.matcher(file.getName()).replaceAll("")));
+    }
+    return snapShotIndex;
+  }
+
   private void clearOldSnapshot() {
     File directory = new File(storageDir);
     File[] versionFiles = directory.listFiles((dir, name) -> 
name.startsWith(SNAPSHOT_DIR_NAME));
@@ -379,12 +392,13 @@ public class IoTConsensusServerImpl {
           thisNode.getGroupId());
       return;
     }
-    Arrays.sort(versionFiles, Comparator.comparing(File::getName));
-    for (int i = 0; i < versionFiles.length - 1; i++) {
-      try {
-        FileUtils.deleteDirectory(versionFiles[i]);
-      } catch (IOException e) {
-        logger.error("Delete old snapshot dir {} failed", 
versionFiles[i].getAbsolutePath(), e);
+    for (File file : versionFiles) {
+      if (!file.getName().equals(newSnapshotDirName)) {
+        try {
+          FileUtils.deleteDirectory(file);
+        } catch (IOException e) {
+          logger.error("Delete old snapshot dir {} failed", 
file.getAbsolutePath(), e);
+        }
       }
     }
   }
@@ -416,7 +430,7 @@ public class IoTConsensusServerImpl {
       TTriggerSnapshotLoadRes res =
           client.triggerSnapshotLoad(
               new TTriggerSnapshotLoadReq(
-                  thisNode.getGroupId().convertToTConsensusGroupId(), 
latestSnapshotId));
+                  thisNode.getGroupId().convertToTConsensusGroupId(), 
newSnapshotDirName));
       if (!isSuccess(res.status)) {
         throw new ConsensusGroupModifyPeerException(
             String.format("error when triggering snapshot load %s. %s", peer, 
res.getStatus()));
@@ -744,7 +758,7 @@ public class IoTConsensusServerImpl {
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       TCleanupTransferredSnapshotReq req =
           new TCleanupTransferredSnapshotReq(
-              targetPeer.getGroupId().convertToTConsensusGroupId(), 
latestSnapshotId);
+              targetPeer.getGroupId().convertToTConsensusGroupId(), 
newSnapshotDirName);
       TCleanupTransferredSnapshotRes res = 
client.cleanupTransferredSnapshot(req);
       if (!isSuccess(res.getStatus())) {
         throw new ConsensusGroupModifyPeerException(
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java 
b/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index e7db5ad6e3..ea7c15c8f1 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.iot.util.TestStateMachine;
 
 import org.apache.ratis.util.FileUtils;
@@ -78,7 +79,13 @@ public class StabilityTest {
   }
 
   @Test
-  public void snapshotTest() throws Exception {
+  public void allTest() throws Exception {
+    peerTest();
+    snapshotTest();
+    snapshotUpgradeTest();
+  }
+
+  public void peerTest() throws Exception {
     consensusImpl.createPeer(
         dataRegionId,
         Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
@@ -94,11 +101,15 @@ public class StabilityTest {
         consensusImpl.createPeer(
             dataRegionId,
             Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
-
     Assert.assertTrue(response.isSuccess());
+    consensusImpl.deletePeer(dataRegionId);
+  }
 
+  public void snapshotTest() throws IOException {
+    consensusImpl.createPeer(
+        dataRegionId,
+        Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
     consensusImpl.triggerSnapshot(dataRegionId);
-    Thread.sleep(10);
 
     File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, 
dataRegionId));
 
@@ -108,14 +119,48 @@ public class StabilityTest {
     Assert.assertEquals(1, versionFiles1.length);
 
     consensusImpl.triggerSnapshot(dataRegionId);
-    Thread.sleep(10);
+
     consensusImpl.triggerSnapshot(dataRegionId);
 
     File[] versionFiles2 =
         dataDir.listFiles((dir, name) -> 
name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
     Assert.assertNotNull(versionFiles2);
     Assert.assertEquals(1, versionFiles2.length);
-
     Assert.assertNotEquals(versionFiles1[0].getName(), 
versionFiles2[0].getName());
+    consensusImpl.deletePeer(dataRegionId);
+  }
+
+  public void snapshotUpgradeTest() throws Exception {
+    consensusImpl.createPeer(
+        dataRegionId,
+        Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
+    consensusImpl.triggerSnapshot(dataRegionId);
+    long oldSnapshotIndex = System.currentTimeMillis();
+    String oldSnapshotDirName =
+        String.format(
+            "%s_%s_%d",
+            IoTConsensusServerImpl.SNAPSHOT_DIR_NAME, dataRegionId.getId(), 
oldSnapshotIndex);
+    File regionDir = new File(storageDir, "1_1");
+    File oldSnapshotDir = new File(regionDir, oldSnapshotDirName);
+    if (oldSnapshotDir.exists()) {
+      FileUtils.deleteFully(oldSnapshotDir);
+    }
+    if (!oldSnapshotDir.mkdirs()) {
+      throw new ConsensusGroupModifyPeerException(
+          String.format("%s: cannot mkdir for snapshot", dataRegionId));
+    }
+    consensusImpl.triggerSnapshot(dataRegionId);
+    Assert.assertFalse(oldSnapshotDir.exists());
+
+    File dataDir = new File(IoTConsensus.buildPeerDir(storageDir, 
dataRegionId));
+
+    File[] snapshotFiles =
+        dataDir.listFiles((dir, name) -> 
name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));
+    Assert.assertNotNull(snapshotFiles);
+    Assert.assertEquals(1, snapshotFiles.length);
+    Assert.assertEquals(
+        oldSnapshotIndex + 1,
+        
Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", 
"")));
+    consensusImpl.deletePeer(dataRegionId);
   }
 }

Reply via email to