This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new b05b69ac9 RATIS-1734. Allow StateMachine to decide snapshot install 
path (#774)
b05b69ac9 is described below

commit b05b69ac92348dd40a12d7c9fbb08f272d65759f
Author: William Song <[email protected]>
AuthorDate: Wed Nov 9 03:00:44 2022 +0800

    RATIS-1734. Allow StateMachine to decide snapshot install path (#774)
---
 .../ratis/statemachine/StateMachineStorage.java    |  15 +-
 .../org/apache/ratis/server/impl/ServerState.java  |   2 +-
 .../server/leader/InstallSnapshotRequests.java     |  25 +-
 .../ratis/server/storage/FileChunkReader.java      |  10 +-
 .../ratis/server/storage/SnapshotManager.java      |  50 +--
 .../ratis/InstallSnapshotFromLeaderTests.java      | 373 ++++++++++++++-------
 6 files changed, 321 insertions(+), 154 deletions(-)

diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
index 86ffdf6db..f5853e1fb 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -17,10 +17,11 @@
  */
 package org.apache.ratis.statemachine;
 
-import java.io.IOException;
-
 import org.apache.ratis.server.storage.RaftStorage;
 
+import java.io.File;
+import java.io.IOException;
+
 public interface StateMachineStorage {
 
   void init(RaftStorage raftStorage) throws IOException;
@@ -38,4 +39,14 @@ public interface StateMachineStorage {
   void format() throws IOException;
 
   void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) 
throws IOException;
+
+  /** @return the state machine directory. */
+  default File getSnapshotDir() {
+    return null;
+  }
+
+  /** @return the temporary directory. */
+  default File getTmpDir() {
+    return null;
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 1ee2cab5e..4dba31756 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -146,7 +146,7 @@ class ServerState implements Closeable {
           RaftServerConfigKeys.storageDir(prop));
     }
 
-    snapshotManager = new SnapshotManager(storage, id);
+    snapshotManager = new SnapshotManager(id, storage.getStorageDir(), 
stateMachine.getStateMachineStorage());
 
     // On start the leader is null, start the clock now
     this.leaderId = null;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index 38d1f9a2a..f52253b24 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -25,16 +25,21 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.storage.FileChunkReader;
 import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.util.JavaUtils;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Function;
 
 /**
  * An {@link Iterable} of {@link InstallSnapshotRequestProto} for sending a 
snapshot.
- *
+ * <p>
  * The snapshot is sent by one or more requests, where
  * a snapshot has one or more files, and
  * a file is sent by one or more chunks.
@@ -43,6 +48,7 @@ import java.util.NoSuchElementException;
 class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> 
{
   private final RaftServer.Division server;
   private final RaftPeerId followerId;
+  private final Function<FileInfo, Path> getRelativePath;
 
   /** The snapshot to be sent. */
   private final SnapshotInfo snapshot;
@@ -71,6 +77,21 @@ class InstallSnapshotRequests implements 
Iterable<InstallSnapshotRequestProto> {
     this.snapshotChunkMaxSize = snapshotChunkMaxSize;
     this.totalSize = 
snapshot.getFiles().stream().mapToLong(FileInfo::getFileSize).reduce(Long::sum).orElseThrow(
             () -> new IllegalStateException("Failed to compute total size for 
snapshot " + snapshot));
+
+    final File snapshotDir = 
server.getStateMachine().getStateMachineStorage().getSnapshotDir();
+    final Function<Path, Path> relativize;
+    if (snapshotDir != null) {
+      final Path dir = snapshotDir.toPath();
+      // add STATE_MACHINE_DIR_NAME for compatibility.
+      relativize = p -> new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, 
dir.relativize(p).toString()).toPath();
+    } else {
+      final Path dir = 
server.getRaftStorage().getStorageDir().getRoot().toPath();
+      relativize = dir::relativize;
+    }
+    this.getRelativePath = info -> Optional.of(info.getPath())
+        .filter(Path::isAbsolute)
+        .map(relativize)
+        .orElseGet(info::getPath);
   }
 
   @Override
@@ -97,7 +118,7 @@ class InstallSnapshotRequests implements 
Iterable<InstallSnapshotRequestProto> {
     final FileInfo info = snapshot.getFiles().get(fileIndex);
     try {
       if (current == null) {
-        current = new FileChunkReader(info, 
server.getRaftStorage().getStorageDir());
+        current = new FileChunkReader(info, getRelativePath.apply(info));
       }
       final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
       if (chunk.getDone()) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
index 15ed98157..a5ee66258 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -30,7 +30,6 @@ import java.io.InputStream;
 import java.nio.file.Path;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
-import java.util.Optional;
 
 /** Read {@link FileChunkProto}s from a file. */
 public class FileChunkReader implements Closeable {
@@ -47,15 +46,12 @@ public class FileChunkReader implements Closeable {
    * Construct a reader from a file specified by the given {@link FileInfo}.
    *
    * @param info the information of the file.
-   * @param directory the directory where the file is stored.
+   * @param relativePath the relative path of the file.
    * @throws IOException if it failed to open the file.
    */
-  public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws 
IOException {
+  public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
     this.info = info;
-    this.relativePath = Optional.of(info.getPath())
-        .filter(Path::isAbsolute)
-        .map(p -> directory.getRoot().toPath().relativize(p))
-        .orElse(info.getPath());
+    this.relativePath = relativePath;
     final File f = info.getPath().toFile();
     if (info.getFileDigest() == null) {
       digester = MD5Hash.getDigester();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index aaa62a783..294f0a205 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -17,32 +17,36 @@
  */
 package org.apache.ratis.server.storage;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.util.function.Supplier;
-
 import org.apache.ratis.io.CorruptedFileException;
 import org.apache.ratis.io.MD5Hash;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.FileChunkProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 /**
  * Manage snapshots of a raft peer.
  * TODO: snapshot should be treated as compaction log thus can be merged into
@@ -54,13 +58,23 @@ public class SnapshotManager {
   private static final String CORRUPT = ".corrupt";
   private static final String TMP = ".tmp";
 
-  private final RaftStorage storage;
   private final RaftPeerId selfId;
+
+  private final Supplier<File> snapshotDir;
+  private final Supplier<File> tmp;
+  private final Function<FileChunkProto, String>  getRelativePath;
   private final Supplier<MessageDigest> digester = 
JavaUtils.memoize(MD5Hash::getDigester);
 
-  public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
-    this.storage = storage;
+  public SnapshotManager(RaftPeerId selfId, RaftStorageDirectory dir, 
StateMachineStorage smStorage) {
     this.selfId = selfId;
+    this.snapshotDir = MemoizedSupplier.valueOf(
+        () -> 
Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(dir::getStateMachineDir));
+    this.tmp = MemoizedSupplier.valueOf(
+        () -> 
Optional.ofNullable(smStorage.getTmpDir()).orElseGet(dir::getTmpDir));
+
+    final Path smDir = dir.getStateMachineDir().toPath();
+    this.getRelativePath =
+        c -> smDir.relativize(new File(dir.getRoot(), 
c.getFilename()).toPath()).toString();
   }
 
   public void installSnapshot(StateMachine stateMachine,
@@ -68,10 +82,9 @@ public class SnapshotManager {
     final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
         request.getSnapshotChunk();
     final long lastIncludedIndex = 
snapshotChunkRequest.getTermIndex().getIndex();
-    final RaftStorageDirectory dir = storage.getStorageDir();
 
     // create a unique temporary directory
-    final File tmpDir =  new File(dir.getTmpDir(), "snapshot-" + 
snapshotChunkRequest.getRequestId());
+    final File tmpDir =  new File(tmp.get(), "snapshot-" + 
snapshotChunkRequest.getRequestId());
     FileUtils.createDirectories(tmpDir);
     tmpDir.deleteOnExit();
 
@@ -81,7 +94,6 @@ public class SnapshotManager {
     // TODO: Make sure that subsequent requests for the same installSnapshot 
are coming in order,
     // and are not lost when whole request cycle is done. Check requestId and 
requestIndex here
 
-    final Path stateMachineDir = dir.getStateMachineDir().toPath();
     for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
       SnapshotInfo pi = stateMachine.getLatestSnapshot();
       if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
@@ -90,9 +102,7 @@ public class SnapshotManager {
             + " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
       }
 
-      String fileName = chunk.getFilename(); // this is relative to the root 
dir
-      final Path relative = stateMachineDir.relativize(new File(dir.getRoot(), 
fileName).toPath());
-      final File tmpSnapshotFile = new File(tmpDir, relative.toString());
+      final File tmpSnapshotFile = new File(tmpDir, 
getRelativePath.apply(chunk));
       FileUtils.createDirectories(tmpSnapshotFile);
 
       FileOutputStream out = null;
@@ -151,7 +161,7 @@ public class SnapshotManager {
     }
 
     if (snapshotChunkRequest.getDone()) {
-      rename(tmpDir, dir.getStateMachineDir());
+      rename(tmpDir, snapshotDir.get());
     }
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index bc4de2ff4..c8e04bb00 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,9 +34,9 @@ import 
org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.MD5FileUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -44,142 +44,271 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
 
 public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends 
MiniRaftCluster>
-        extends BaseTest
-        implements MiniRaftCluster.Factory.Get<CLUSTER> {
-    static final Logger LOG = 
LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
-    {
-        final RaftProperties prop = getProperties();
-        prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-                StateMachineWithMultiNestedSnapshotFile.class, 
StateMachine.class);
-        RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
-        RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
-                prop, SNAPSHOT_TRIGGER_THRESHOLD);
-        RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  static final Logger LOG = 
LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
+
+  {
+    final RaftProperties prop = getProperties();
+    RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+        prop, SNAPSHOT_TRIGGER_THRESHOLD);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+  }
+
+  private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
+  private static final int PURGE_GAP = 8;
+
+  @Test
+  public void testMultiFileInstallSnapshot() throws Exception {
+    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        StateMachineWithMultiNestedSnapshotFile.class, StateMachine.class);
+    runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+  }
+
+  @Test
+  public void testSeparateSnapshotInstallPath() throws Exception {
+    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        StateMachineWithSeparatedSnapshotPath.class, StateMachine.class);
+    runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+  }
+
+  private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
+    try {
+      int i = 0;
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try (final RaftClient client = cluster.createClient(leaderId)) {
+        for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+          RaftClientReply
+              reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + 
i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+
+        client.getSnapshotManagementApi(leaderId).create(3000);
+      }
+
+      final SnapshotInfo snapshot = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+      Assert.assertEquals(3, snapshot.getFiles().size());
+
+      // add two more peers
+      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+          true);
+      // trigger setConfiguration
+      cluster.setConfiguration(change.allPeersInNewConf);
+
+      RaftServerTestUtil
+          .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+      // Check the installed snapshot file number on each Follower matches 
with the
+      // leader snapshot.
+      for (RaftServer.Division follower : cluster.getFollowers()) {
+        Assert.assertEquals(3, 
follower.getStateMachine().getLatestSnapshot().getFiles().size());
+      }
+    } finally {
+      cluster.shutdown();
     }
+  }
+
+  private static class StateMachineWithMultiNestedSnapshotFile extends 
SimpleStateMachine4Testing {
+
+    File snapshotRoot;
+    File file1;
+    File file2;
+
+    @Override
+    public synchronized void initialize(RaftServer server, RaftGroupId 
groupId, RaftStorage raftStorage) throws IOException {
+      super.initialize(server, groupId, raftStorage);
 
-    private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
-    private static final int PURGE_GAP = 8;
+      // contains two snapshot files
+      // sm/snapshot/1.bin
+      // sm/snapshot/sub/2.bin
+      snapshotRoot = new File(getSMdir(), "snapshot");
+      FileUtils.deleteFully(snapshotRoot);
+      file1 = new File(snapshotRoot, "1.bin");
+      file2 = new File(new File(snapshotRoot, "sub"), "2.bin");
+    }
 
-    @Test
-    public void testMultiFileInstallSnapshot() throws Exception {
-        runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+    @Override
+    public synchronized void pause() {
+      if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+        getLifeCycle().transition(LifeCycle.State.PAUSING);
+        getLifeCycle().transition(LifeCycle.State.PAUSED);
+      }
     }
 
-    private void testMultiFileInstallSnapshot(CLUSTER cluster) throws 
Exception {
-        try {
-            int i = 0;
-            RaftTestUtil.waitForLeader(cluster);
-            final RaftPeerId leaderId = cluster.getLeader().getId();
-
-            try (final RaftClient client = cluster.createClient(leaderId)) {
-                for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
-                    RaftClientReply
-                            reply = client.io().send(new 
RaftTestUtil.SimpleMessage("m" + i));
-                    Assert.assertTrue(reply.isSuccess());
-                }
-
-                client.getSnapshotManagementApi(leaderId).create(3000);
-            }
-
-            final SnapshotInfo snapshot = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
-            Assert.assertEquals(3, snapshot.getFiles().size());
-
-            // add two more peers
-            final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, 
true,
-                    true);
-            // trigger setConfiguration
-            cluster.setConfiguration(change.allPeersInNewConf);
-
-            RaftServerTestUtil
-                    .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, 
null);
-
-            // Check the installed snapshot file number on each Follower 
matches with the
-            // leader snapshot.
-            for (RaftServer.Division follower : cluster.getFollowers()) {
-                
Assert.assertEquals(follower.getStateMachine().getLatestSnapshot().getFiles().size(),
 3);
-            }
-        } finally {
-            cluster.shutdown();
+    @Override
+    public long takeSnapshot() {
+      final TermIndex termIndex = getLastAppliedTermIndex();
+      if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
+        return RaftLog.INVALID_LOG_INDEX;
+      }
+
+      final long endIndex = termIndex.getIndex();
+      try {
+        if (!snapshotRoot.exists()) {
+          FileUtils.createDirectories(snapshotRoot);
+          FileUtils.createDirectories(file1.getParentFile());
+          FileUtils.createDirectories(file2.getParentFile());
+          FileUtils.createNewFile(file1.toPath());
+          FileUtils.createNewFile(file2.toPath());
         }
+
+      } catch (IOException ioException) {
+        return RaftLog.INVALID_LOG_INDEX;
+      }
+
+      Assert.assertTrue(file1.exists());
+      Assert.assertTrue(file2.exists());
+      return super.takeSnapshot();
     }
 
-    private static class StateMachineWithMultiNestedSnapshotFile extends 
SimpleStateMachine4Testing {
+    @Override
+    public SnapshotInfo getLatestSnapshot() {
+      if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
+        return null;
+      }
+      List<FileInfo> files = new ArrayList<>();
+      files.add(new FileInfo(
+          file1.toPath(),
+          null));
+      files.add(new FileInfo(
+          file2.toPath(),
+          null));
+      Assert.assertEquals(2, files.size());
 
-        File snapshotRoot;
-        File file1;
-        File file2;
+      SnapshotInfo info = super.getLatestSnapshot();
+      if (info == null) {
+        return null;
+      }
+      files.add(info.getFiles().get(0));
+      return new FileListSnapshotInfo(files,
+          info.getTerm(), info.getIndex());
+    }
+  }
 
-        @Override
-        public synchronized void initialize(RaftServer server, RaftGroupId 
groupId, RaftStorage raftStorage) throws IOException {
-            super.initialize(server, groupId, raftStorage);
 
-            // contains two snapshot files
-            // sm/snapshot/1.bin
-            // sm/snapshot/sub/2.bin
-            snapshotRoot = new File(getSMdir(), "snapshot");
-            file1 = new File(snapshotRoot, "1.bin");
-            file2 = new File(new File(snapshotRoot, "sub"), "2.bin");
-        }
+  private static class StateMachineWithSeparatedSnapshotPath extends 
SimpleStateMachine4Testing {
+    private File root;
+    private File snapshotDir;
+    private File tmpDir;
 
-        @Override
-        public synchronized void pause() {
-            if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
-                getLifeCycle().transition(LifeCycle.State.PAUSING);
-                getLifeCycle().transition(LifeCycle.State.PAUSED);
-            }
-        }
+    @Override
+    public synchronized void initialize(RaftServer server, RaftGroupId 
groupId, RaftStorage raftStorage) throws IOException {
+      super.initialize(server, groupId, raftStorage);
+      this.root = new File("/tmp/ratis-tests/statemachine/" + 
getId().toString());
+      this.snapshotDir = new File(root, "snapshot");
+      this.tmpDir = new File(root, "tmp");
+      FileUtils.deleteFully(root);
+      Assert.assertTrue(this.snapshotDir.mkdirs());
+      Assert.assertTrue(this.tmpDir.mkdirs());
+      this.root.deleteOnExit();
+    }
+
+    @Override
+    public synchronized void pause() {
+      if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+        getLifeCycle().transition(LifeCycle.State.PAUSING);
+        getLifeCycle().transition(LifeCycle.State.PAUSED);
+      }
+    }
+
+    @Override
+    public long takeSnapshot() {
+      final TermIndex lastApplied = getLastAppliedTermIndex();
+      final File snapshotTmpDir = new File(tmpDir, 
UUID.randomUUID().toString());
+      final File snapshotRealDir = new File(snapshotDir, 
String.format("%d_%d", lastApplied.getTerm(), lastApplied.getIndex()));
+
+      try {
+        FileUtils.deleteFully(snapshotRealDir);
+        FileUtils.deleteFully(snapshotTmpDir);
+        Assert.assertTrue(snapshotTmpDir.mkdirs());
+        final File snapshotFile1 = new File(snapshotTmpDir, "deer");
+        final File snapshotFile2 = new File(snapshotTmpDir, "loves");
+        final File snapshotFile3 = new File(snapshotTmpDir, "vegetable");
+        Assert.assertTrue(snapshotFile1.createNewFile());
+        Assert.assertTrue(snapshotFile2.createNewFile());
+        Assert.assertTrue(snapshotFile3.createNewFile());
+        FileUtils.move(snapshotTmpDir, snapshotRealDir);
+      } catch (IOException ioe) {
+        LOG.error("create snapshot data file failed", ioe);
+        return RaftLog.INVALID_LOG_INDEX;
+      }
+
+      return lastApplied.getIndex();
+    }
+
+    @Override
+    public SnapshotInfo getLatestSnapshot() {
+      Path[] sortedSnapshots = getSortedSnapshotDirPaths();
+      if (sortedSnapshots == null || sortedSnapshots.length == 0) {
+        return null;
+      }
 
-        @Override
-        public long takeSnapshot() {
-            final TermIndex termIndex = getLastAppliedTermIndex();
-            if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
-                return RaftLog.INVALID_LOG_INDEX;
-            }
-
-            final long endIndex = termIndex.getIndex();
-            try {
-                if (!snapshotRoot.exists()) {
-                    FileUtils.createDirectories(snapshotRoot);
-                    FileUtils.createDirectories(file1.getParentFile());
-                    FileUtils.createDirectories(file2.getParentFile());
-                    FileUtils.createNewFile(file1.toPath());
-                    FileUtils.createNewFile(file2.toPath());
-                }
-
-            } catch (IOException ioException) {
-                return RaftLog.INVALID_LOG_INDEX;
-            }
-
-            Assert.assertTrue(file1.exists());
-            Assert.assertTrue(file2.exists());
-            return super.takeSnapshot();
+      File latest = sortedSnapshots[sortedSnapshots.length - 1].toFile();
+      TermIndex snapshotLastIncluded = TermIndex.valueOf
+          (Long.parseLong(latest.getName().split("_")[0]), 
Long.parseLong(latest.getName().split("_")[1]));
+
+      List<FileInfo> fileInfos = new ArrayList<>();
+      for (File f : Objects.requireNonNull(latest.listFiles())) {
+        if (!f.getName().endsWith(".md5")) {
+          fileInfos.add(new FileInfo(f.toPath(), null));
         }
+      }
+
+      return new FileListSnapshotInfo(fileInfos, 
snapshotLastIncluded.getTerm(), snapshotLastIncluded.getIndex());
+    }
 
-        @Override
-        public SnapshotInfo getLatestSnapshot() {
-            if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
-                return null;
-            }
-            List<FileInfo> files = new ArrayList<>();
-            files.add(new FileInfo(
-                    file1.toPath(),
-                    null));
-            files.add(new FileInfo(
-                    file2.toPath(),
-                    null));
-            Assert.assertEquals(files.size(), 2);
-
-            SnapshotInfo info = super.getLatestSnapshot();
-            if (info == null) {
-                return null;
-            }
-            files.add(info.getFiles().get(0));
-            return new FileListSnapshotInfo(files,
-                    info.getTerm(), info.getIndex());
+    private Path[] getSortedSnapshotDirPaths() {
+      ArrayList<Path> snapshotPaths = new ArrayList<>();
+      try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(snapshotDir.toPath())) {
+        for (Path path : stream) {
+          if (path.toFile().isDirectory()) {
+            snapshotPaths.add(path);
+          }
         }
+      } catch (IOException exception) {
+        LOG.warn("cannot construct snapshot directory stream ", exception);
+        return null;
+      }
+
+      Path[] pathArray = snapshotPaths.toArray(new Path[0]);
+      Arrays.sort(
+          pathArray,
+          (o1, o2) -> {
+            String index1 = o1.toFile().getName().split("_")[1];
+            String index2 = o2.toFile().getName().split("_")[1];
+            return Long.compare(Long.parseLong(index1), 
Long.parseLong(index2));
+          });
+      return pathArray;
+    }
+
+    @Override
+    public SimpleStateMachineStorage getStateMachineStorage() {
+      return new SeparateSnapshotStateMachineStorage();
+    }
+
+    private class SeparateSnapshotStateMachineStorage extends 
SimpleStateMachineStorage {
+      @Override
+      public File getSnapshotDir() {
+        return snapshotDir;
+      }
+
+      @Override
+      public File getTmpDir() {
+        return tmpDir;
+      }
     }
-}
+  }
+}
\ No newline at end of file

Reply via email to