This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch snapshot-3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 2fddd52ae25e5f1c263e73f3c2d1c63e918a69d8 Author: szywilliam <[email protected]> AuthorDate: Thu May 16 10:25:32 2024 +0800 reply ALREADY_INSTALLED --- .../ratis/server/impl/ConfigurationManager.java | 4 +- .../org/apache/ratis/server/impl/ServerState.java | 4 +- .../server/impl/SnapshotInstallationHandler.java | 7 ++- .../ratis/InstallSnapshotFromLeaderTests.java | 70 ++++++++++++++++++++++ .../test/java/org/apache/ratis/RaftTestUtil.java | 14 +++++ .../ratis/server/impl/LeaderElectionTests.java | 38 ++++-------- .../ratis/grpc/TestLeaderInstallSnapshot.java | 6 ++ 7 files changed, 111 insertions(+), 32 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java index 10c59c8b1..714177a3e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java @@ -61,10 +61,10 @@ public class ConfigurationManager { } } - synchronized void addConfiguration(RaftConfiguration conf) { + synchronized void addConfiguration(RaftConfiguration conf, long commitIndex) { final long logIndex = conf.getLogEntryIndex(); final RaftConfiguration found = configurations.get(logIndex); - if (found != null) { + if (found != null && logIndex <= commitIndex) { Preconditions.assertTrue(found.equals(conf)); return; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 0f46c6b52..285dc16d5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -377,7 +377,9 @@ class ServerState { } void setRaftConf(RaftConfiguration conf) { - configurationManager.addConfiguration(conf); + final long lastCommittedIndex = server.getState().log.isInitialized() ? + server.getRaftLog().getLastCommittedIndex() : RaftLog.INVALID_LOG_INDEX; + configurationManager.addConfiguration(conf, lastCommittedIndex); server.getServerRpc().addRaftPeers(conf.getAllPeers()); final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER); if (!listeners.isEmpty()) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index f03e2d883..2f3b3ba7a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -175,9 +175,10 @@ class SnapshotInstallationHandler { // Check and append the snapshot chunk. We simply put this in lock // considering a follower peer requiring a snapshot installation does not // have a lot of requests - Preconditions.assertTrue(state.getLog().getLastCommittedIndex() < lastIncludedIndex, - "%s log's commit index is %s, last included index in snapshot is %s", - getMemberId(), state.getLog().getLastCommittedIndex(), lastIncludedIndex); + if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { + return toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); + } //TODO: We should only update State with installed snapshot once the request is done. state.installSnapshot(request); diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java index 9d6a2f183..e699b79e9 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java @@ -21,7 +21,10 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.exceptions.RaftRetryFailureException; +import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.MiniRaftCluster; @@ -85,6 +88,12 @@ public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu runWithNewCluster(1, this::testMultiFileInstallSnapshot); } + public void testInstallSnapshotLeaderSwitch() throws Exception { + getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + StateMachineWithSeparatedSnapshotPath.class, StateMachine.class); + runWithNewCluster(3, this::testInstallSnapshotDuringLeaderSwitch); + } + private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception { try { int i = 0; @@ -127,6 +136,67 @@ public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu } } + private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws Exception { + try { + RaftTestUtil.waitForLeader(cluster); + final RaftPeerId leaderId = cluster.getLeader().getId(); + + // perform operations and force all peers to take snapshot + try (final RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < SNAPSHOT_TRIGGER_THRESHOLD * 2; i++) { + final RaftClientReply + reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assertions.assertTrue(reply.isSuccess()); + } + + for (final RaftPeer peer: cluster.getPeers()) { + final RaftClientReply snapshotReply = client.getSnapshotManagementApi(leaderId).create(3000); + Assertions.assertTrue(snapshotReply.isSuccess()); + } + } + final SnapshotInfo snapshot = cluster.getLeader().getStateMachine().getLatestSnapshot(); + Assertions.assertNotNull(snapshot); + + // isolate two followers (majority) in old configuration + final List<RaftServer.Division> oldFollowers = cluster.getFollowers(); + for (RaftServer.Division f: oldFollowers) { + RaftTestUtil.isolate(cluster, f.getId()); + } + + // add two more peers and install snapshot from leaders + final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, + true); + try (final RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) { + Assertions.assertThrows(RaftRetryFailureException.class, + () -> client.admin().setConfiguration(change.allPeersInNewConf)); + } + + final SnapshotInfo snapshotInfo = cluster.getDivision(change.newPeers[0].getId()) + .getStateMachine().getLatestSnapshot(); + Assertions.assertNotNull(snapshotInfo); + + // recover the old followers and isolate the leader to force leader switch + RaftTestUtil.isolate(cluster, leaderId); + for (RaftServer.Division f: oldFollowers) { + RaftTestUtil.deIsolate(cluster, f.getId()); + } + RaftTestUtil.waitForLeader(cluster); + + try (final RaftClient client = cluster.createClient(cluster.getLeader().getId())) { + // successfully setConfiguration during leader switch + final RaftClientReply setConf = client.admin().setConfiguration(change.allPeersInNewConf); + Assertions.assertTrue(setConf.isSuccess()); + + RaftTestUtil.deIsolate(cluster, leaderId); + final RaftClientReply + reply = client.io().send(new RaftTestUtil.SimpleMessage("final")); + Assertions.assertTrue(reply.isSuccess()); + } + } finally { + cluster.shutdown(); + } + } + private static class StateMachineWithMultiNestedSnapshotFile extends SimpleStateMachine4Testing { File snapshotRoot; diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 2c1d34e27..fa1226685 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -513,6 +513,20 @@ public interface RaftTestUtil { Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS)); } + static void isolate(MiniRaftCluster cluster, RaftPeerId id) { + try { + BlockRequestHandlingInjection.getInstance().blockReplier(id.toString()); + cluster.setBlockRequestsFrom(id.toString(), true); + } catch (Exception e) { + e.printStackTrace(); + } + } + + static void deIsolate(MiniRaftCluster cluster, RaftPeerId id) { + BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString()); + cluster.setBlockRequestsFrom(id.toString(), false); + } + static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) { Thread t = new Thread(() -> { try (final RaftClient client = cluster.createClient(leaderId)) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index fbb909ac9..d7d58eff1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -122,12 +122,12 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> final TimeDuration maxTimeout = RaftServerConfigKeys.Rpc.timeoutMax(getProperties()); final RaftServer.Division leader = waitForLeader(cluster); try { - isolate(cluster, leader.getId()); + RaftTestUtil.isolate(cluster, leader.getId()); maxTimeout.sleep(); maxTimeout.sleep(); RaftServerTestUtil.assertLostMajorityHeartbeatsRecently(leader); } finally { - deIsolate(cluster, leader.getId()); + RaftTestUtil.deIsolate(cluster, leader.getId()); } } @@ -164,12 +164,12 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> final RaftServer.Division listener = cluster.getListeners().get(0); final RaftPeerId listenerId = listener.getId(); try { - isolate(cluster, listenerId); + RaftTestUtil.isolate(cluster, listenerId); maxTimeout.sleep(); maxTimeout.sleep(); Assertions.assertEquals(RaftProtos.RaftPeerRole.LISTENER, listener.getInfo().getCurrentRole()); } finally { - deIsolate(cluster, listener.getId()); + RaftTestUtil.deIsolate(cluster, listener.getId()); } } @@ -247,7 +247,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> RaftServer.Division newLeader = followers.get(0); // isolate new leader, so that transfer leadership will timeout - isolate(cluster, newLeader.getId()); + RaftTestUtil.isolate(cluster, newLeader.getId()); List<RaftPeer> peers = cluster.getPeers(); @@ -287,7 +287,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> Assertions.assertEquals(leader.getId().toString(), reply.getReplierId()); Assertions.assertTrue(reply.isSuccess()); - deIsolate(cluster, newLeader.getId()); + RaftTestUtil.deIsolate(cluster, newLeader.getId()); } cluster.shutdown(); @@ -364,32 +364,18 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> try (RaftClient client = cluster.createClient(leader.getId())) { client.io().send(new RaftTestUtil.SimpleMessage("message")); Thread.sleep(1000); - isolate(cluster, leader.getId()); + RaftTestUtil.isolate(cluster, leader.getId()); RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message")); Assertions.assertNotEquals(reply.getReplierId(), leader.getId().toString()); Assertions.assertTrue(reply.isSuccess()); } finally { - deIsolate(cluster, leader.getId()); + RaftTestUtil.deIsolate(cluster, leader.getId()); } cluster.shutdown(); } } - private void isolate(MiniRaftCluster cluster, RaftPeerId id) { - try { - BlockRequestHandlingInjection.getInstance().blockReplier(id.toString()); - cluster.setBlockRequestsFrom(id.toString(), true); - } catch (Exception e) { - e.printStackTrace(); - } - } - - private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) { - BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString()); - cluster.setBlockRequestsFrom(id.toString(), false); - } - @Test public void testAddListener() throws Exception { try (final MiniRaftCluster cluster = newCluster(3)) { @@ -570,7 +556,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> assertEquals(followers.size(), 2); RaftServer.Division follower = followers.get(0); - isolate(cluster, follower.getId()); + RaftTestUtil.isolate(cluster, follower.getId()); // send message so that the isolated follower's log lag the others RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message")); Assertions.assertTrue(reply.isSuccess()); @@ -578,7 +564,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> final long savedTerm = leader.getInfo().getCurrentTerm(); LOG.info("Wait follower {} timeout and trigger pre-vote", follower.getId()); Thread.sleep(2000); - deIsolate(cluster, follower.getId()); + RaftTestUtil.deIsolate(cluster, follower.getId()); Thread.sleep(2000); // with pre-vote leader will not step down RaftServer.Division newleader = waitForLeader(cluster); @@ -668,14 +654,14 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> Assertions.assertTrue(leader.getInfo().isLeaderReady()); RaftServerTestUtil.assertLeaderLease(leader, true); - isolate(cluster, leader.getId()); + RaftTestUtil.isolate(cluster, leader.getId()); Thread.sleep(leaseTimeoutMs); Assertions.assertTrue(leader.getInfo().isLeader()); Assertions.assertTrue(leader.getInfo().isLeaderReady()); RaftServerTestUtil.assertLeaderLease(leader, false); } finally { - deIsolate(cluster, leader.getId()); + RaftTestUtil.deIsolate(cluster, leader.getId()); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java index 22c590c9d..b85cd1353 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java @@ -46,4 +46,10 @@ implements MiniRaftClusterWithGrpc.FactoryGet { super.testSeparateSnapshotInstallPath(); } + @ParameterizedTest + @MethodSource("data") + public void testInstallSnapshotLeaderSwitch(Boolean separateHeartbeat) throws Exception { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + super.testInstallSnapshotLeaderSwitch(); + } }
