This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit afba5dafc2a8cadfea0c03566cb4c6f7bcc20369
Author: William Song <[email protected]>
AuthorDate: Wed Jun 8 12:39:55 2022 +0800

    RATIS-1587. Fix snapshot multi-chunk bug & support snapshot hierarchy (#655)
    
    (cherry picked from commit 3c4e6f15521d6c0219956225c78154967856adab)
---
 ratis-proto/src/main/proto/Grpc.proto              |   2 +-
 .../ratis/server/storage/SnapshotManager.java      |  11 +-
 .../ratis/InstallSnapshotFromLeaderTests.java      | 175 +++++++++++++++++++++
 .../ratis/grpc/TestLeaderInstallSnapshot.java      |  24 +++
 4 files changed, 206 insertions(+), 6 deletions(-)

diff --git a/ratis-proto/src/main/proto/Grpc.proto 
b/ratis-proto/src/main/proto/Grpc.proto
index 2e348f20..06af061e 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -44,7 +44,7 @@ service RaftServerProtocolService {
       returns(stream ratis.common.AppendEntriesReplyProto) {}
 
   rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
-      returns(ratis.common.InstallSnapshotReplyProto) {}
+      returns(stream ratis.common.InstallSnapshotReplyProto) {}
 }
 
 service AdminProtocolService {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 79ec73bc..8b5c6396 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -21,7 +21,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
-import java.util.UUID;
+import java.nio.file.Path;
 
 import org.apache.ratis.io.CorruptedFileException;
 import org.apache.ratis.io.MD5Hash;
@@ -65,7 +65,7 @@ public class SnapshotManager {
     final RaftStorageDirectory dir = storage.getStorageDir();
 
     // create a unique temporary directory
-    final File tmpDir =  new File(dir.getTmpDir(), 
UUID.randomUUID().toString());
+    final File tmpDir =  new File(dir.getTmpDir(), "snapshot-" + 
snapshotChunkRequest.getRequestId());
     FileUtils.createDirectories(tmpDir);
     tmpDir.deleteOnExit();
 
@@ -74,6 +74,7 @@ public class SnapshotManager {
     // TODO: Make sure that subsequent requests for the same installSnapshot 
are coming in order,
     // and are not lost when whole request cycle is done. Check requestId and 
requestIndex here
 
+    final Path stateMachineDir = dir.getStateMachineDir().toPath();
     for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
       SnapshotInfo pi = stateMachine.getLatestSnapshot();
       if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
@@ -83,9 +84,9 @@ public class SnapshotManager {
       }
 
       String fileName = chunk.getFilename(); // this is relative to the root 
dir
-      // TODO: assumes flat layout inside SM dir
-      File tmpSnapshotFile = new File(tmpDir,
-          new File(dir.getRoot(), fileName).getName());
+      final Path relative = stateMachineDir.relativize(new File(dir.getRoot(), 
fileName).toPath());
+      final File tmpSnapshotFile = new File(tmpDir, relative.toString());
+      FileUtils.createDirectories(tmpSnapshotFile);
 
       FileOutputStream out = null;
       try {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
new file mode 100644
index 00000000..b4cad6a4
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MD5FileUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends 
MiniRaftCluster>
+        extends BaseTest
+        implements MiniRaftCluster.Factory.Get<CLUSTER> {
+    static final Logger LOG = 
LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
+    {
+        final RaftProperties prop = getProperties();
+        prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+                StateMachineWithMultiNestedSnapshotFile.class, 
StateMachine.class);
+        RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
+        RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+                prop, SNAPSHOT_TRIGGER_THRESHOLD);
+        RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+    }
+
+    private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
+    private static final int PURGE_GAP = 8;
+
+    @Test
+    public void testMultiFileInstallSnapshot() throws Exception {
+        runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+    }
+
+    private void testMultiFileInstallSnapshot(CLUSTER cluster) throws 
Exception {
+        try {
+            int i = 0;
+            RaftTestUtil.waitForLeader(cluster);
+            final RaftPeerId leaderId = cluster.getLeader().getId();
+
+            try (final RaftClient client = cluster.createClient(leaderId)) {
+                for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+                    RaftClientReply
+                            reply = client.io().send(new 
RaftTestUtil.SimpleMessage("m" + i));
+                    Assert.assertTrue(reply.isSuccess());
+                }
+
+                client.getSnapshotManagementApi(leaderId).create(3000);
+            }
+
+            final SnapshotInfo snapshot = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+            Assert.assertEquals(3, snapshot.getFiles().size());
+
+            // add two more peers
+            final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, 
true,
+                    true);
+            // trigger setConfiguration
+            cluster.setConfiguration(change.allPeersInNewConf);
+
+            RaftServerTestUtil
+                    .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, 
null);
+
+            // Check the installed snapshot file number on each Follower 
matches with the
+            // leader snapshot.
+            for (RaftServer.Division follower : cluster.getFollowers()) {
+                
Assert.assertEquals(follower.getStateMachine().getLatestSnapshot().getFiles().size(),
 3);
+            }
+        } finally {
+            cluster.shutdown();
+        }
+    }
+
+    private static class StateMachineWithMultiNestedSnapshotFile extends 
SimpleStateMachine4Testing {
+        // contains two snapshot files
+        // sm/snapshot/1.bin
+        // sm/snapshot/sub/2.bin
+        final File snapshotRoot = new File(getSMdir(), "snapshot");
+        final File file1 = new File(snapshotRoot, "1.bin");
+        final File file2 = new File(new File(snapshotRoot, "sub"), "2.bin");
+
+        @Override
+        public synchronized void pause() {
+            if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+                getLifeCycle().transition(LifeCycle.State.PAUSING);
+                getLifeCycle().transition(LifeCycle.State.PAUSED);
+            }
+        }
+
+        @Override
+        public long takeSnapshot() {
+            final TermIndex termIndex = getLastAppliedTermIndex();
+            if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
+                return RaftLog.INVALID_LOG_INDEX;
+            }
+
+            final long endIndex = termIndex.getIndex();
+            try {
+                if (!snapshotRoot.exists()) {
+                    FileUtils.createDirectories(snapshotRoot);
+                    FileUtils.createDirectories(file1.getParentFile());
+                    FileUtils.createDirectories(file2.getParentFile());
+                    FileUtils.createNewFile(file1.toPath());
+                    FileUtils.createNewFile(file2.toPath());
+                }
+
+            } catch (IOException ioException) {
+                return RaftLog.INVALID_LOG_INDEX;
+            }
+
+            Assert.assertTrue(file1.exists());
+            Assert.assertTrue(file2.exists());
+            return super.takeSnapshot();
+        }
+
+        @Override
+        public SnapshotInfo getLatestSnapshot() {
+            if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
+                return null;
+            }
+            List<FileInfo> files = new ArrayList<>();
+            try {
+                files.add(new FileInfo(
+                        file1.toPath(),
+                        MD5FileUtil.computeMd5ForFile(file1)));
+                files.add(new FileInfo(
+                        file2.toPath(),
+                        MD5FileUtil.computeMd5ForFile(file2)));
+            } catch (IOException ignored) {}
+            Assert.assertEquals(files.size(), 2);
+
+            SnapshotInfo info = super.getLatestSnapshot();
+            if (info == null) {
+                return null;
+            }
+            files.add(info.getFiles().get(0));
+            return new FileListSnapshotInfo(files,
+                    info.getTerm(), info.getIndex());
+        }
+    }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
new file mode 100644
index 00000000..ee646941
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.InstallSnapshotFromLeaderTests;
+
+public class TestLeaderInstallSnapshot
+extends InstallSnapshotFromLeaderTests<MiniRaftClusterWithGrpc>
+implements MiniRaftClusterWithGrpc.FactoryGet {}

Reply via email to