This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 924a0cdf4 RATIS-2084. Follower reply ALREADY_INSTALLED when install
old snapshots from leader (#1091)
924a0cdf4 is described below
commit 924a0cdf43557d67209805627bd6b1ec941776f0
Author: William Song <[email protected]>
AuthorDate: Thu May 30 15:44:21 2024 +0800
RATIS-2084. Follower reply ALREADY_INSTALLED when install old snapshots
from leader (#1091)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 1 +
.../org/apache/ratis/server/impl/ServerState.java | 4 ++
.../server/impl/SnapshotInstallationHandler.java | 8 ++-
.../ratis/InstallSnapshotFromLeaderTests.java | 70 ++++++++++++++++++++++
.../test/java/org/apache/ratis/RaftTestUtil.java | 14 +++++
.../ratis/server/impl/LeaderElectionTests.java | 38 ++++--------
.../ratis/grpc/TestLeaderInstallSnapshot.java | 6 ++
7 files changed, 112 insertions(+), 29 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index cba3bba1c..7ec94076f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1875,6 +1875,7 @@ class RaftServerImpl implements RaftServer.Division,
* @param logEntry the log entry being truncated
*/
void notifyTruncatedLogEntry(LogEntryProto logEntry) {
+ Optional.ofNullable(getState()).ifPresent(s ->
s.truncate(logEntry.getIndex()));
if (logEntry.hasStateMachineLogEntry()) {
getTransactionManager().remove(TermIndex.valueOf(logEntry));
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0f46c6b52..c49e9554f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -387,6 +387,10 @@ class ServerState {
LOG.trace("{}: {}", getMemberId(), configurationManager);
}
+ void truncate(long logIndex) {
+ configurationManager.removeConfigurations(logIndex);
+ }
+
void updateConfiguration(List<LogEntryProto> entries) {
if (entries != null && !entries.isEmpty()) {
configurationManager.removeConfigurations(entries.get(0).getIndex());
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index f03e2d883..4a63e64ee 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -133,6 +133,7 @@ class SnapshotInstallationHandler {
if (request.hasLastRaftConfigurationLogEntryProto()) {
// Set the configuration included in the snapshot
final LogEntryProto proto =
request.getLastRaftConfigurationLogEntryProto();
+ state.truncate(proto.getIndex());
if
(!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
LOG.info("{}: set new configuration {} from snapshot",
getMemberId(), proto);
state.setRaftConf(proto);
@@ -175,9 +176,10 @@ class SnapshotInstallationHandler {
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does
not
// have a lot of requests
- Preconditions.assertTrue(state.getLog().getLastCommittedIndex() <
lastIncludedIndex,
- "%s log's commit index is %s, last included index in snapshot is
%s",
- getMemberId(), state.getLog().getLastCommittedIndex(),
lastIncludedIndex);
+ if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
+ return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.ALREADY_INSTALLED);
+ }
//TODO: We should only update State with installed snapshot once the
request is done.
state.installSnapshot(request);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index 46cfebbd1..b83a7dfdd 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -21,7 +21,10 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -85,6 +88,12 @@ public abstract class InstallSnapshotFromLeaderTests<CLUSTER
extends MiniRaftClu
runWithNewCluster(1, this::testMultiFileInstallSnapshot);
}
+ public void testInstallSnapshotLeaderSwitch() throws Exception {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ StateMachineWithSeparatedSnapshotPath.class, StateMachine.class);
+ runWithNewCluster(3, this::testInstallSnapshotDuringLeaderSwitch);
+ }
+
private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
try {
int i = 0;
@@ -127,6 +136,67 @@ public abstract class
InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu
}
}
+ private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws
Exception {
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ // perform operations and force all peers to take snapshot
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ for (int i = 0; i < SNAPSHOT_TRIGGER_THRESHOLD * 2; i++) {
+ final RaftClientReply
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("m" +
i));
+ Assertions.assertTrue(reply.isSuccess());
+ }
+
+ for (final RaftPeer peer: cluster.getPeers()) {
+ final RaftClientReply snapshotReply =
client.getSnapshotManagementApi(leaderId).create(3000);
+ Assertions.assertTrue(snapshotReply.isSuccess());
+ }
+ }
+ final SnapshotInfo snapshot =
cluster.getLeader().getStateMachine().getLatestSnapshot();
+ Assertions.assertNotNull(snapshot);
+
+ // isolate two followers (majority) in old configuration
+ final List<RaftServer.Division> oldFollowers = cluster.getFollowers();
+ for (RaftServer.Division f: oldFollowers) {
+ RaftTestUtil.isolate(cluster, f.getId());
+ }
+
+ // add two more peers and install snapshot from leaders
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+ true);
+ try (final RaftClient client = cluster.createClient(leaderId,
RetryPolicies.noRetry())) {
+ Assertions.assertThrows(RaftRetryFailureException.class,
+ () ->
client.admin().setConfiguration(change.allPeersInNewConf));
+ }
+
+ final SnapshotInfo snapshotInfo =
cluster.getDivision(change.newPeers[0].getId())
+ .getStateMachine().getLatestSnapshot();
+ Assertions.assertNotNull(snapshotInfo);
+
+ // recover the old followers and isolate the leader to force leader
switch
+ RaftTestUtil.isolate(cluster, leaderId);
+ for (RaftServer.Division f: oldFollowers) {
+ RaftTestUtil.deIsolate(cluster, f.getId());
+ }
+ RaftTestUtil.waitForLeader(cluster);
+
+ try (final RaftClient client =
cluster.createClient(cluster.getLeader().getId())) {
+ // successfully setConfiguration during leader switch
+ final RaftClientReply setConf =
client.admin().setConfiguration(change.allPeersInNewConf);
+ Assertions.assertTrue(setConf.isSuccess());
+
+ RaftTestUtil.deIsolate(cluster, leaderId);
+ final RaftClientReply
+ reply = client.io().send(new
RaftTestUtil.SimpleMessage("final"));
+ Assertions.assertTrue(reply.isSuccess());
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
private static class StateMachineWithMultiNestedSnapshotFile extends
SimpleStateMachine4Testing {
File snapshotRoot;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index f7a3f9a52..be8739ad8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -514,6 +514,20 @@ public interface RaftTestUtil {
Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS));
}
+ static void isolate(MiniRaftCluster cluster, RaftPeerId id) {
+ try {
+ BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
+ cluster.setBlockRequestsFrom(id.toString(), true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ static void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
+ BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
+ cluster.setBlockRequestsFrom(id.toString(), false);
+ }
+
static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId
leaderId, SimpleMessage... messages) {
Thread t = new Thread(() -> {
try (final RaftClient client = cluster.createClient(leaderId)) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 69791896a..ecb4a3dc0 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -122,12 +122,12 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
final TimeDuration maxTimeout =
RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
final RaftServer.Division leader = waitForLeader(cluster);
try {
- isolate(cluster, leader.getId());
+ RaftTestUtil.isolate(cluster, leader.getId());
maxTimeout.sleep();
maxTimeout.sleep();
RaftServerTestUtil.assertLostMajorityHeartbeatsRecently(leader);
} finally {
- deIsolate(cluster, leader.getId());
+ RaftTestUtil.deIsolate(cluster, leader.getId());
}
}
@@ -164,12 +164,12 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
final RaftServer.Division listener = cluster.getListeners().get(0);
final RaftPeerId listenerId = listener.getId();
try {
- isolate(cluster, listenerId);
+ RaftTestUtil.isolate(cluster, listenerId);
maxTimeout.sleep();
maxTimeout.sleep();
Assertions.assertEquals(RaftProtos.RaftPeerRole.LISTENER,
listener.getInfo().getCurrentRole());
} finally {
- deIsolate(cluster, listener.getId());
+ RaftTestUtil.deIsolate(cluster, listener.getId());
}
}
@@ -247,7 +247,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
RaftServer.Division newLeader = followers.get(0);
// isolate new leader, so that transfer leadership will timeout
- isolate(cluster, newLeader.getId());
+ RaftTestUtil.isolate(cluster, newLeader.getId());
List<RaftPeer> peers = cluster.getPeers();
@@ -287,7 +287,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
Assertions.assertEquals(leader.getId().toString(),
reply.getReplierId());
Assertions.assertTrue(reply.isSuccess());
- deIsolate(cluster, newLeader.getId());
+ RaftTestUtil.deIsolate(cluster, newLeader.getId());
}
cluster.shutdown();
@@ -364,32 +364,18 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
Thread.sleep(1000);
- isolate(cluster, leader.getId());
+ RaftTestUtil.isolate(cluster, leader.getId());
RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("message"));
Assertions.assertNotEquals(reply.getReplierId(),
leader.getId().toString());
Assertions.assertTrue(reply.isSuccess());
} finally {
- deIsolate(cluster, leader.getId());
+ RaftTestUtil.deIsolate(cluster, leader.getId());
}
cluster.shutdown();
}
}
- private void isolate(MiniRaftCluster cluster, RaftPeerId id) {
- try {
- BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
- cluster.setBlockRequestsFrom(id.toString(), true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
- BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
- cluster.setBlockRequestsFrom(id.toString(), false);
- }
-
@Test
public void testAddListener() throws Exception {
try (final MiniRaftCluster cluster = newCluster(3)) {
@@ -571,7 +557,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
assertEquals(followers.size(), 2);
RaftServer.Division follower = followers.get(0);
- isolate(cluster, follower.getId());
+ RaftTestUtil.isolate(cluster, follower.getId());
// send message so that the isolated follower's log lag the others
RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("message"));
Assertions.assertTrue(reply.isSuccess());
@@ -579,7 +565,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
final long savedTerm = leader.getInfo().getCurrentTerm();
LOG.info("Wait follower {} timeout and trigger pre-vote",
follower.getId());
Thread.sleep(2000);
- deIsolate(cluster, follower.getId());
+ RaftTestUtil.deIsolate(cluster, follower.getId());
Thread.sleep(2000);
// with pre-vote leader will not step down
RaftServer.Division newleader = waitForLeader(cluster);
@@ -670,14 +656,14 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
Assertions.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, true);
- isolate(cluster, leader.getId());
+ RaftTestUtil.isolate(cluster, leader.getId());
Thread.sleep(leaseTimeoutMs);
Assertions.assertTrue(leader.getInfo().isLeader());
Assertions.assertTrue(leader.getInfo().isLeaderReady());
RaftServerTestUtil.assertLeaderLease(leader, false);
} finally {
- deIsolate(cluster, leader.getId());
+ RaftTestUtil.deIsolate(cluster, leader.getId());
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
index 22c590c9d..b85cd1353 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
@@ -46,4 +46,10 @@ implements MiniRaftClusterWithGrpc.FactoryGet {
super.testSeparateSnapshotInstallPath();
}
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testInstallSnapshotLeaderSwitch(Boolean separateHeartbeat)
throws Exception {
+ GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(),
separateHeartbeat);
+ super.testInstallSnapshotLeaderSwitch();
+ }
}