This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit afba5dafc2a8cadfea0c03566cb4c6f7bcc20369 Author: William Song <[email protected]> AuthorDate: Wed Jun 8 12:39:55 2022 +0800 RATIS-1587. Fix snapshot multi-chunk bug & support snapshot hierarchy (#655) (cherry picked from commit 3c4e6f15521d6c0219956225c78154967856adab) --- ratis-proto/src/main/proto/Grpc.proto | 2 +- .../ratis/server/storage/SnapshotManager.java | 11 +- .../ratis/InstallSnapshotFromLeaderTests.java | 175 +++++++++++++++++++++ .../ratis/grpc/TestLeaderInstallSnapshot.java | 24 +++ 4 files changed, 206 insertions(+), 6 deletions(-) diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto index 2e348f20..06af061e 100644 --- a/ratis-proto/src/main/proto/Grpc.proto +++ b/ratis-proto/src/main/proto/Grpc.proto @@ -44,7 +44,7 @@ service RaftServerProtocolService { returns(stream ratis.common.AppendEntriesReplyProto) {} rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto) - returns(ratis.common.InstallSnapshotReplyProto) {} + returns(stream ratis.common.InstallSnapshotReplyProto) {} } service AdminProtocolService { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index 79ec73bc..8b5c6396 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -21,7 +21,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; -import java.util.UUID; +import java.nio.file.Path; import org.apache.ratis.io.CorruptedFileException; import org.apache.ratis.io.MD5Hash; @@ -65,7 +65,7 @@ public class SnapshotManager { final RaftStorageDirectory dir = storage.getStorageDir(); // create a unique temporary directory - final File tmpDir = new File(dir.getTmpDir(), UUID.randomUUID().toString()); + final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId()); FileUtils.createDirectories(tmpDir); tmpDir.deleteOnExit(); @@ -74,6 +74,7 @@ public class SnapshotManager { // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order, // and are not lost when whole request cycle is done. Check requestId and requestIndex here + final Path stateMachineDir = dir.getStateMachineDir().toPath(); for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) { SnapshotInfo pi = stateMachine.getLatestSnapshot(); if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) { @@ -83,9 +84,9 @@ public class SnapshotManager { } String fileName = chunk.getFilename(); // this is relative to the root dir - // TODO: assumes flat layout inside SM dir - File tmpSnapshotFile = new File(tmpDir, - new File(dir.getRoot(), fileName).getName()); + final Path relative = stateMachineDir.relativize(new File(dir.getRoot(), fileName).toPath()); + final File tmpSnapshotFile = new File(tmpDir, relative.toString()); + FileUtils.createDirectories(tmpSnapshotFile); FileOutputStream out = null; try { diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java new file mode 100644 index 00000000..b4cad6a4 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.server.impl.RaftServerTestUtil; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.raftlog.RaftLog; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.impl.FileListSnapshotInfo; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.MD5FileUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + static final Logger LOG = LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class); + { + final RaftProperties prop = getProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + StateMachineWithMultiNestedSnapshotFile.class, StateMachine.class); + RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP); + RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold( + prop, SNAPSHOT_TRIGGER_THRESHOLD); + RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true); + } + + private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64; + private static final int PURGE_GAP = 8; + + @Test + public void testMultiFileInstallSnapshot() throws Exception { + runWithNewCluster(1, this::testMultiFileInstallSnapshot); + } + + private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception { + try { + int i = 0; + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + try (final RaftClient client = cluster.createClient(leaderId)) { + for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { + RaftClientReply + reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + + client.getSnapshotManagementApi(leaderId).create(3000); + } + + final SnapshotInfo snapshot = cluster.getLeader().getStateMachine().getLatestSnapshot(); + Assert.assertEquals(3, snapshot.getFiles().size()); + + // add two more peers + final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, + true); + // trigger setConfiguration + cluster.setConfiguration(change.allPeersInNewConf); + + RaftServerTestUtil + .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); + + // Check the installed snapshot file number on each Follower matches with the + // leader snapshot. + for (RaftServer.Division follower : cluster.getFollowers()) { + Assert.assertEquals(follower.getStateMachine().getLatestSnapshot().getFiles().size(), 3); + } + } finally { + cluster.shutdown(); + } + } + + private static class StateMachineWithMultiNestedSnapshotFile extends SimpleStateMachine4Testing { + // contains two snapshot files + // sm/snapshot/1.bin + // sm/snapshot/sub/2.bin + final File snapshotRoot = new File(getSMdir(), "snapshot"); + final File file1 = new File(snapshotRoot, "1.bin"); + final File file2 = new File(new File(snapshotRoot, "sub"), "2.bin"); + + @Override + public synchronized void pause() { + if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) { + getLifeCycle().transition(LifeCycle.State.PAUSING); + getLifeCycle().transition(LifeCycle.State.PAUSED); + } + } + + @Override + public long takeSnapshot() { + final TermIndex termIndex = getLastAppliedTermIndex(); + if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) { + return RaftLog.INVALID_LOG_INDEX; + } + + final long endIndex = termIndex.getIndex(); + try { + if (!snapshotRoot.exists()) { + FileUtils.createDirectories(snapshotRoot); + FileUtils.createDirectories(file1.getParentFile()); + FileUtils.createDirectories(file2.getParentFile()); + FileUtils.createNewFile(file1.toPath()); + FileUtils.createNewFile(file2.toPath()); + } + + } catch (IOException ioException) { + return RaftLog.INVALID_LOG_INDEX; + } + + Assert.assertTrue(file1.exists()); + Assert.assertTrue(file2.exists()); + return super.takeSnapshot(); + } + + @Override + public SnapshotInfo getLatestSnapshot() { + if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) { + return null; + } + List<FileInfo> files = new ArrayList<>(); + try { + files.add(new FileInfo( + file1.toPath(), + MD5FileUtil.computeMd5ForFile(file1))); + files.add(new FileInfo( + file2.toPath(), + MD5FileUtil.computeMd5ForFile(file2))); + } catch (IOException ignored) {} + Assert.assertEquals(files.size(), 2); + + SnapshotInfo info = super.getLatestSnapshot(); + if (info == null) { + return null; + } + files.add(info.getFiles().get(0)); + return new FileListSnapshotInfo(files, + info.getTerm(), info.getIndex()); + } + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java new file mode 100644 index 00000000..ee646941 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.InstallSnapshotFromLeaderTests; + +public class TestLeaderInstallSnapshot +extends InstallSnapshotFromLeaderTests<MiniRaftClusterWithGrpc> +implements MiniRaftClusterWithGrpc.FactoryGet {}
