This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3c4e6f15 RATIS-1587. Fix snapshot multi-chunk bug & support snapshot
hierarchy (#655)
3c4e6f15 is described below
commit 3c4e6f15521d6c0219956225c78154967856adab
Author: William Song <[email protected]>
AuthorDate: Wed Jun 8 12:39:55 2022 +0800
RATIS-1587. Fix snapshot multi-chunk bug & support snapshot hierarchy (#655)
---
ratis-proto/src/main/proto/Grpc.proto | 2 +-
.../ratis/server/storage/SnapshotManager.java | 11 +-
.../ratis/InstallSnapshotFromLeaderTests.java | 175 +++++++++++++++++++++
.../ratis/grpc/TestLeaderInstallSnapshot.java | 24 +++
4 files changed, 206 insertions(+), 6 deletions(-)
diff --git a/ratis-proto/src/main/proto/Grpc.proto
b/ratis-proto/src/main/proto/Grpc.proto
index 2e348f20..06af061e 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -44,7 +44,7 @@ service RaftServerProtocolService {
returns(stream ratis.common.AppendEntriesReplyProto) {}
rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
- returns(ratis.common.InstallSnapshotReplyProto) {}
+ returns(stream ratis.common.InstallSnapshotReplyProto) {}
}
service AdminProtocolService {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 79ec73bc..8b5c6396 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -21,7 +21,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
-import java.util.UUID;
+import java.nio.file.Path;
import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.io.MD5Hash;
@@ -65,7 +65,7 @@ public class SnapshotManager {
final RaftStorageDirectory dir = storage.getStorageDir();
// create a unique temporary directory
- final File tmpDir = new File(dir.getTmpDir(),
UUID.randomUUID().toString());
+ final File tmpDir = new File(dir.getTmpDir(), "snapshot-" +
snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();
@@ -74,6 +74,7 @@ public class SnapshotManager {
// TODO: Make sure that subsequent requests for the same installSnapshot
are coming in order,
// and are not lost when whole request cycle is done. Check requestId and
requestIndex here
+ final Path stateMachineDir = dir.getStateMachineDir().toPath();
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
@@ -83,9 +84,9 @@ public class SnapshotManager {
}
String fileName = chunk.getFilename(); // this is relative to the root
dir
- // TODO: assumes flat layout inside SM dir
- File tmpSnapshotFile = new File(tmpDir,
- new File(dir.getRoot(), fileName).getName());
+ final Path relative = stateMachineDir.relativize(new File(dir.getRoot(),
fileName).toPath());
+ final File tmpSnapshotFile = new File(tmpDir, relative.toString());
+ FileUtils.createDirectories(tmpSnapshotFile);
FileOutputStream out = null;
try {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
new file mode 100644
index 00000000..b4cad6a4
--- /dev/null
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MD5FileUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends
MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ static final Logger LOG =
LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
+ {
+ final RaftProperties prop = getProperties();
+ prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ StateMachineWithMultiNestedSnapshotFile.class,
StateMachine.class);
+ RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+ prop, SNAPSHOT_TRIGGER_THRESHOLD);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+ }
+
+ private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
+ private static final int PURGE_GAP = 8;
+
+ @Test
+ public void testMultiFileInstallSnapshot() throws Exception {
+ runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+ }
+
+ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws
Exception {
+ try {
+ int i = 0;
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ RaftClientReply
+ reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ client.getSnapshotManagementApi(leaderId).create(3000);
+ }
+
+ final SnapshotInfo snapshot =
cluster.getLeader().getStateMachine().getLatestSnapshot();
+ Assert.assertEquals(3, snapshot.getFiles().size());
+
+ // add two more peers
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2,
true,
+ true);
+ // trigger setConfiguration
+ cluster.setConfiguration(change.allPeersInNewConf);
+
+ RaftServerTestUtil
+ .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0,
null);
+
+ // Check the installed snapshot file number on each Follower
matches with the
+ // leader snapshot.
+ for (RaftServer.Division follower : cluster.getFollowers()) {
+
Assert.assertEquals(follower.getStateMachine().getLatestSnapshot().getFiles().size(),
3);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private static class StateMachineWithMultiNestedSnapshotFile extends
SimpleStateMachine4Testing {
+ // contains two snapshot files
+ // sm/snapshot/1.bin
+ // sm/snapshot/sub/2.bin
+ final File snapshotRoot = new File(getSMdir(), "snapshot");
+ final File file1 = new File(snapshotRoot, "1.bin");
+ final File file2 = new File(new File(snapshotRoot, "sub"), "2.bin");
+
+ @Override
+ public synchronized void pause() {
+ if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
+ }
+
+ @Override
+ public long takeSnapshot() {
+ final TermIndex termIndex = getLastAppliedTermIndex();
+ if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ final long endIndex = termIndex.getIndex();
+ try {
+ if (!snapshotRoot.exists()) {
+ FileUtils.createDirectories(snapshotRoot);
+ FileUtils.createDirectories(file1.getParentFile());
+ FileUtils.createDirectories(file2.getParentFile());
+ FileUtils.createNewFile(file1.toPath());
+ FileUtils.createNewFile(file2.toPath());
+ }
+
+ } catch (IOException ioException) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ Assert.assertTrue(file1.exists());
+ Assert.assertTrue(file2.exists());
+ return super.takeSnapshot();
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
+ return null;
+ }
+ List<FileInfo> files = new ArrayList<>();
+ try {
+ files.add(new FileInfo(
+ file1.toPath(),
+ MD5FileUtil.computeMd5ForFile(file1)));
+ files.add(new FileInfo(
+ file2.toPath(),
+ MD5FileUtil.computeMd5ForFile(file2)));
+ } catch (IOException ignored) {}
+ Assert.assertEquals(files.size(), 2);
+
+ SnapshotInfo info = super.getLatestSnapshot();
+ if (info == null) {
+ return null;
+ }
+ files.add(info.getFiles().get(0));
+ return new FileListSnapshotInfo(files,
+ info.getTerm(), info.getIndex());
+ }
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
new file mode 100644
index 00000000..ee646941
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.InstallSnapshotFromLeaderTests;
+
+public class TestLeaderInstallSnapshot
+extends InstallSnapshotFromLeaderTests<MiniRaftClusterWithGrpc>
+implements MiniRaftClusterWithGrpc.FactoryGet {}