This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new b05b69ac9 RATIS-1734. Allow StateMachine to decide snapshot install
path (#774)
b05b69ac9 is described below
commit b05b69ac92348dd40a12d7c9fbb08f272d65759f
Author: William Song <[email protected]>
AuthorDate: Wed Nov 9 03:00:44 2022 +0800
RATIS-1734. Allow StateMachine to decide snapshot install path (#774)
---
.../ratis/statemachine/StateMachineStorage.java | 15 +-
.../org/apache/ratis/server/impl/ServerState.java | 2 +-
.../server/leader/InstallSnapshotRequests.java | 25 +-
.../ratis/server/storage/FileChunkReader.java | 10 +-
.../ratis/server/storage/SnapshotManager.java | 50 +--
.../ratis/InstallSnapshotFromLeaderTests.java | 373 ++++++++++++++-------
6 files changed, 321 insertions(+), 154 deletions(-)
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
index 86ffdf6db..f5853e1fb 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -17,10 +17,11 @@
*/
package org.apache.ratis.statemachine;
-import java.io.IOException;
-
import org.apache.ratis.server.storage.RaftStorage;
+import java.io.File;
+import java.io.IOException;
+
public interface StateMachineStorage {
void init(RaftStorage raftStorage) throws IOException;
@@ -38,4 +39,14 @@ public interface StateMachineStorage {
void format() throws IOException;
void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
throws IOException;
+
+ /** @return the state machine directory. */
+ default File getSnapshotDir() {
+ return null;
+ }
+
+ /** @return the temporary directory. */
+ default File getTmpDir() {
+ return null;
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 1ee2cab5e..4dba31756 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -146,7 +146,7 @@ class ServerState implements Closeable {
RaftServerConfigKeys.storageDir(prop));
}
- snapshotManager = new SnapshotManager(storage, id);
+ snapshotManager = new SnapshotManager(id, storage.getStorageDir(),
stateMachine.getStateMachineStorage());
// On start the leader is null, start the clock now
this.leaderId = null;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index 38d1f9a2a..f52253b24 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -25,16 +25,21 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.storage.FileChunkReader;
import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.JavaUtils;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Function;
/**
* An {@link Iterable} of {@link InstallSnapshotRequestProto} for sending a
snapshot.
- *
+ * <p>
* The snapshot is sent by one or more requests, where
* a snapshot has one or more files, and
* a file is sent by one or more chunks.
@@ -43,6 +48,7 @@ import java.util.NoSuchElementException;
class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto>
{
private final RaftServer.Division server;
private final RaftPeerId followerId;
+ private final Function<FileInfo, Path> getRelativePath;
/** The snapshot to be sent. */
private final SnapshotInfo snapshot;
@@ -71,6 +77,21 @@ class InstallSnapshotRequests implements
Iterable<InstallSnapshotRequestProto> {
this.snapshotChunkMaxSize = snapshotChunkMaxSize;
this.totalSize =
snapshot.getFiles().stream().mapToLong(FileInfo::getFileSize).reduce(Long::sum).orElseThrow(
() -> new IllegalStateException("Failed to compute total size for
snapshot " + snapshot));
+
+ final File snapshotDir =
server.getStateMachine().getStateMachineStorage().getSnapshotDir();
+ final Function<Path, Path> relativize;
+ if (snapshotDir != null) {
+ final Path dir = snapshotDir.toPath();
+ // add STATE_MACHINE_DIR_NAME for compatibility.
+ relativize = p -> new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME,
dir.relativize(p).toString()).toPath();
+ } else {
+ final Path dir =
server.getRaftStorage().getStorageDir().getRoot().toPath();
+ relativize = dir::relativize;
+ }
+ this.getRelativePath = info -> Optional.of(info.getPath())
+ .filter(Path::isAbsolute)
+ .map(relativize)
+ .orElseGet(info::getPath);
}
@Override
@@ -97,7 +118,7 @@ class InstallSnapshotRequests implements
Iterable<InstallSnapshotRequestProto> {
final FileInfo info = snapshot.getFiles().get(fileIndex);
try {
if (current == null) {
- current = new FileChunkReader(info,
server.getRaftStorage().getStorageDir());
+ current = new FileChunkReader(info, getRelativePath.apply(info));
}
final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
if (chunk.getDone()) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
index 15ed98157..a5ee66258 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -30,7 +30,6 @@ import java.io.InputStream;
import java.nio.file.Path;
import java.security.DigestInputStream;
import java.security.MessageDigest;
-import java.util.Optional;
/** Read {@link FileChunkProto}s from a file. */
public class FileChunkReader implements Closeable {
@@ -47,15 +46,12 @@ public class FileChunkReader implements Closeable {
* Construct a reader from a file specified by the given {@link FileInfo}.
*
* @param info the information of the file.
- * @param directory the directory where the file is stored.
+ * @param relativePath the relative path of the file.
* @throws IOException if it failed to open the file.
*/
- public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws
IOException {
+ public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
this.info = info;
- this.relativePath = Optional.of(info.getPath())
- .filter(Path::isAbsolute)
- .map(p -> directory.getRoot().toPath().relativize(p))
- .orElse(info.getPath());
+ this.relativePath = relativePath;
final File f = info.getPath().toFile();
if (info.getFileDigest() == null) {
digester = MD5Hash.getDigester();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index aaa62a783..294f0a205 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -17,32 +17,36 @@
*/
package org.apache.ratis.server.storage;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.util.function.Supplier;
-
import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.io.MD5Hash;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
/**
* Manage snapshots of a raft peer.
* TODO: snapshot should be treated as compaction log thus can be merged into
@@ -54,13 +58,23 @@ public class SnapshotManager {
private static final String CORRUPT = ".corrupt";
private static final String TMP = ".tmp";
- private final RaftStorage storage;
private final RaftPeerId selfId;
+
+ private final Supplier<File> snapshotDir;
+ private final Supplier<File> tmp;
+ private final Function<FileChunkProto, String> getRelativePath;
private final Supplier<MessageDigest> digester =
JavaUtils.memoize(MD5Hash::getDigester);
- public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
- this.storage = storage;
+ public SnapshotManager(RaftPeerId selfId, RaftStorageDirectory dir,
StateMachineStorage smStorage) {
this.selfId = selfId;
+ this.snapshotDir = MemoizedSupplier.valueOf(
+ () ->
Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(dir::getStateMachineDir));
+ this.tmp = MemoizedSupplier.valueOf(
+ () ->
Optional.ofNullable(smStorage.getTmpDir()).orElseGet(dir::getTmpDir));
+
+ final Path smDir = dir.getStateMachineDir().toPath();
+ this.getRelativePath =
+ c -> smDir.relativize(new File(dir.getRoot(),
c.getFilename()).toPath()).toString();
}
public void installSnapshot(StateMachine stateMachine,
@@ -68,10 +82,9 @@ public class SnapshotManager {
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
request.getSnapshotChunk();
final long lastIncludedIndex =
snapshotChunkRequest.getTermIndex().getIndex();
- final RaftStorageDirectory dir = storage.getStorageDir();
// create a unique temporary directory
- final File tmpDir = new File(dir.getTmpDir(), "snapshot-" +
snapshotChunkRequest.getRequestId());
+ final File tmpDir = new File(tmp.get(), "snapshot-" +
snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();
@@ -81,7 +94,6 @@ public class SnapshotManager {
// TODO: Make sure that subsequent requests for the same installSnapshot
are coming in order,
// and are not lost when whole request cycle is done. Check requestId and
requestIndex here
- final Path stateMachineDir = dir.getStateMachineDir().toPath();
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
@@ -90,9 +102,7 @@ public class SnapshotManager {
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
}
- String fileName = chunk.getFilename(); // this is relative to the root
dir
- final Path relative = stateMachineDir.relativize(new File(dir.getRoot(),
fileName).toPath());
- final File tmpSnapshotFile = new File(tmpDir, relative.toString());
+ final File tmpSnapshotFile = new File(tmpDir,
getRelativePath.apply(chunk));
FileUtils.createDirectories(tmpSnapshotFile);
FileOutputStream out = null;
@@ -151,7 +161,7 @@ public class SnapshotManager {
}
if (snapshotChunkRequest.getDone()) {
- rename(tmpDir, dir.getStateMachineDir());
+ rename(tmpDir, snapshotDir.get());
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index bc4de2ff4..c8e04bb00 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,9 +34,9 @@ import
org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.MD5FileUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -44,142 +44,271 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends
MiniRaftCluster>
- extends BaseTest
- implements MiniRaftCluster.Factory.Get<CLUSTER> {
- static final Logger LOG =
LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
- {
- final RaftProperties prop = getProperties();
- prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
- StateMachineWithMultiNestedSnapshotFile.class,
StateMachine.class);
- RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
- RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
- prop, SNAPSHOT_TRIGGER_THRESHOLD);
- RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ static final Logger LOG =
LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
+
+ {
+ final RaftProperties prop = getProperties();
+ RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+ prop, SNAPSHOT_TRIGGER_THRESHOLD);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+ }
+
+ private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
+ private static final int PURGE_GAP = 8;
+
+ @Test
+ public void testMultiFileInstallSnapshot() throws Exception {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ StateMachineWithMultiNestedSnapshotFile.class, StateMachine.class);
+ runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+ }
+
+ @Test
+ public void testSeparateSnapshotInstallPath() throws Exception {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ StateMachineWithSeparatedSnapshotPath.class, StateMachine.class);
+ runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+ }
+
+ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
+ try {
+ int i = 0;
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ RaftClientReply
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("m" +
i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ client.getSnapshotManagementApi(leaderId).create(3000);
+ }
+
+ final SnapshotInfo snapshot =
cluster.getLeader().getStateMachine().getLatestSnapshot();
+ Assert.assertEquals(3, snapshot.getFiles().size());
+
+ // add two more peers
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+ true);
+ // trigger setConfiguration
+ cluster.setConfiguration(change.allPeersInNewConf);
+
+ RaftServerTestUtil
+ .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+ // Check the installed snapshot file number on each Follower matches
with the
+ // leader snapshot.
+ for (RaftServer.Division follower : cluster.getFollowers()) {
+ Assert.assertEquals(3,
follower.getStateMachine().getLatestSnapshot().getFiles().size());
+ }
+ } finally {
+ cluster.shutdown();
}
+ }
+
+ private static class StateMachineWithMultiNestedSnapshotFile extends
SimpleStateMachine4Testing {
+
+ File snapshotRoot;
+ File file1;
+ File file2;
+
+ @Override
+ public synchronized void initialize(RaftServer server, RaftGroupId
groupId, RaftStorage raftStorage) throws IOException {
+ super.initialize(server, groupId, raftStorage);
- private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
- private static final int PURGE_GAP = 8;
+ // contains two snapshot files
+ // sm/snapshot/1.bin
+ // sm/snapshot/sub/2.bin
+ snapshotRoot = new File(getSMdir(), "snapshot");
+ FileUtils.deleteFully(snapshotRoot);
+ file1 = new File(snapshotRoot, "1.bin");
+ file2 = new File(new File(snapshotRoot, "sub"), "2.bin");
+ }
- @Test
- public void testMultiFileInstallSnapshot() throws Exception {
- runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+ @Override
+ public synchronized void pause() {
+ if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
}
- private void testMultiFileInstallSnapshot(CLUSTER cluster) throws
Exception {
- try {
- int i = 0;
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
-
- try (final RaftClient client = cluster.createClient(leaderId)) {
- for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- RaftClientReply
- reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + i));
- Assert.assertTrue(reply.isSuccess());
- }
-
- client.getSnapshotManagementApi(leaderId).create(3000);
- }
-
- final SnapshotInfo snapshot =
cluster.getLeader().getStateMachine().getLatestSnapshot();
- Assert.assertEquals(3, snapshot.getFiles().size());
-
- // add two more peers
- final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2,
true,
- true);
- // trigger setConfiguration
- cluster.setConfiguration(change.allPeersInNewConf);
-
- RaftServerTestUtil
- .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0,
null);
-
- // Check the installed snapshot file number on each Follower
matches with the
- // leader snapshot.
- for (RaftServer.Division follower : cluster.getFollowers()) {
-
Assert.assertEquals(follower.getStateMachine().getLatestSnapshot().getFiles().size(),
3);
- }
- } finally {
- cluster.shutdown();
+ @Override
+ public long takeSnapshot() {
+ final TermIndex termIndex = getLastAppliedTermIndex();
+ if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ final long endIndex = termIndex.getIndex();
+ try {
+ if (!snapshotRoot.exists()) {
+ FileUtils.createDirectories(snapshotRoot);
+ FileUtils.createDirectories(file1.getParentFile());
+ FileUtils.createDirectories(file2.getParentFile());
+ FileUtils.createNewFile(file1.toPath());
+ FileUtils.createNewFile(file2.toPath());
}
+
+ } catch (IOException ioException) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ Assert.assertTrue(file1.exists());
+ Assert.assertTrue(file2.exists());
+ return super.takeSnapshot();
}
- private static class StateMachineWithMultiNestedSnapshotFile extends
SimpleStateMachine4Testing {
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
+ return null;
+ }
+ List<FileInfo> files = new ArrayList<>();
+ files.add(new FileInfo(
+ file1.toPath(),
+ null));
+ files.add(new FileInfo(
+ file2.toPath(),
+ null));
+ Assert.assertEquals(2, files.size());
- File snapshotRoot;
- File file1;
- File file2;
+ SnapshotInfo info = super.getLatestSnapshot();
+ if (info == null) {
+ return null;
+ }
+ files.add(info.getFiles().get(0));
+ return new FileListSnapshotInfo(files,
+ info.getTerm(), info.getIndex());
+ }
+ }
- @Override
- public synchronized void initialize(RaftServer server, RaftGroupId
groupId, RaftStorage raftStorage) throws IOException {
- super.initialize(server, groupId, raftStorage);
- // contains two snapshot files
- // sm/snapshot/1.bin
- // sm/snapshot/sub/2.bin
- snapshotRoot = new File(getSMdir(), "snapshot");
- file1 = new File(snapshotRoot, "1.bin");
- file2 = new File(new File(snapshotRoot, "sub"), "2.bin");
- }
+ private static class StateMachineWithSeparatedSnapshotPath extends
SimpleStateMachine4Testing {
+ private File root;
+ private File snapshotDir;
+ private File tmpDir;
- @Override
- public synchronized void pause() {
- if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
- getLifeCycle().transition(LifeCycle.State.PAUSING);
- getLifeCycle().transition(LifeCycle.State.PAUSED);
- }
- }
+ @Override
+ public synchronized void initialize(RaftServer server, RaftGroupId
groupId, RaftStorage raftStorage) throws IOException {
+ super.initialize(server, groupId, raftStorage);
+ this.root = new File("/tmp/ratis-tests/statemachine/" +
getId().toString());
+ this.snapshotDir = new File(root, "snapshot");
+ this.tmpDir = new File(root, "tmp");
+ FileUtils.deleteFully(root);
+ Assert.assertTrue(this.snapshotDir.mkdirs());
+ Assert.assertTrue(this.tmpDir.mkdirs());
+ this.root.deleteOnExit();
+ }
+
+ @Override
+ public synchronized void pause() {
+ if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
+ }
+
+ @Override
+ public long takeSnapshot() {
+ final TermIndex lastApplied = getLastAppliedTermIndex();
+ final File snapshotTmpDir = new File(tmpDir,
UUID.randomUUID().toString());
+ final File snapshotRealDir = new File(snapshotDir,
String.format("%d_%d", lastApplied.getTerm(), lastApplied.getIndex()));
+
+ try {
+ FileUtils.deleteFully(snapshotRealDir);
+ FileUtils.deleteFully(snapshotTmpDir);
+ Assert.assertTrue(snapshotTmpDir.mkdirs());
+ final File snapshotFile1 = new File(snapshotTmpDir, "deer");
+ final File snapshotFile2 = new File(snapshotTmpDir, "loves");
+ final File snapshotFile3 = new File(snapshotTmpDir, "vegetable");
+ Assert.assertTrue(snapshotFile1.createNewFile());
+ Assert.assertTrue(snapshotFile2.createNewFile());
+ Assert.assertTrue(snapshotFile3.createNewFile());
+ FileUtils.move(snapshotTmpDir, snapshotRealDir);
+ } catch (IOException ioe) {
+ LOG.error("create snapshot data file failed", ioe);
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ return lastApplied.getIndex();
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ Path[] sortedSnapshots = getSortedSnapshotDirPaths();
+ if (sortedSnapshots == null || sortedSnapshots.length == 0) {
+ return null;
+ }
- @Override
- public long takeSnapshot() {
- final TermIndex termIndex = getLastAppliedTermIndex();
- if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
- return RaftLog.INVALID_LOG_INDEX;
- }
-
- final long endIndex = termIndex.getIndex();
- try {
- if (!snapshotRoot.exists()) {
- FileUtils.createDirectories(snapshotRoot);
- FileUtils.createDirectories(file1.getParentFile());
- FileUtils.createDirectories(file2.getParentFile());
- FileUtils.createNewFile(file1.toPath());
- FileUtils.createNewFile(file2.toPath());
- }
-
- } catch (IOException ioException) {
- return RaftLog.INVALID_LOG_INDEX;
- }
-
- Assert.assertTrue(file1.exists());
- Assert.assertTrue(file2.exists());
- return super.takeSnapshot();
+ File latest = sortedSnapshots[sortedSnapshots.length - 1].toFile();
+ TermIndex snapshotLastIncluded = TermIndex.valueOf
+ (Long.parseLong(latest.getName().split("_")[0]),
Long.parseLong(latest.getName().split("_")[1]));
+
+ List<FileInfo> fileInfos = new ArrayList<>();
+ for (File f : Objects.requireNonNull(latest.listFiles())) {
+ if (!f.getName().endsWith(".md5")) {
+ fileInfos.add(new FileInfo(f.toPath(), null));
}
+ }
+
+ return new FileListSnapshotInfo(fileInfos,
snapshotLastIncluded.getTerm(), snapshotLastIncluded.getIndex());
+ }
- @Override
- public SnapshotInfo getLatestSnapshot() {
- if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
- return null;
- }
- List<FileInfo> files = new ArrayList<>();
- files.add(new FileInfo(
- file1.toPath(),
- null));
- files.add(new FileInfo(
- file2.toPath(),
- null));
- Assert.assertEquals(files.size(), 2);
-
- SnapshotInfo info = super.getLatestSnapshot();
- if (info == null) {
- return null;
- }
- files.add(info.getFiles().get(0));
- return new FileListSnapshotInfo(files,
- info.getTerm(), info.getIndex());
+ private Path[] getSortedSnapshotDirPaths() {
+ ArrayList<Path> snapshotPaths = new ArrayList<>();
+ try (DirectoryStream<Path> stream =
Files.newDirectoryStream(snapshotDir.toPath())) {
+ for (Path path : stream) {
+ if (path.toFile().isDirectory()) {
+ snapshotPaths.add(path);
+ }
}
+ } catch (IOException exception) {
+ LOG.warn("cannot construct snapshot directory stream ", exception);
+ return null;
+ }
+
+ Path[] pathArray = snapshotPaths.toArray(new Path[0]);
+ Arrays.sort(
+ pathArray,
+ (o1, o2) -> {
+ String index1 = o1.toFile().getName().split("_")[1];
+ String index2 = o2.toFile().getName().split("_")[1];
+ return Long.compare(Long.parseLong(index1),
Long.parseLong(index2));
+ });
+ return pathArray;
+ }
+
+ @Override
+ public SimpleStateMachineStorage getStateMachineStorage() {
+ return new SeparateSnapshotStateMachineStorage();
+ }
+
+ private class SeparateSnapshotStateMachineStorage extends
SimpleStateMachineStorage {
+ @Override
+ public File getSnapshotDir() {
+ return snapshotDir;
+ }
+
+ @Override
+ public File getTmpDir() {
+ return tmpDir;
+ }
}
-}
+ }
+}
\ No newline at end of file