This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 924a0cdf4 RATIS-2084. Follower reply ALREADY_INSTALLED when install 
old snapshots from leader (#1091)
924a0cdf4 is described below

commit 924a0cdf43557d67209805627bd6b1ec941776f0
Author: William Song <[email protected]>
AuthorDate: Thu May 30 15:44:21 2024 +0800

    RATIS-2084. Follower reply ALREADY_INSTALLED when install old snapshots 
from leader (#1091)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  1 +
 .../org/apache/ratis/server/impl/ServerState.java  |  4 ++
 .../server/impl/SnapshotInstallationHandler.java   |  8 ++-
 .../ratis/InstallSnapshotFromLeaderTests.java      | 70 ++++++++++++++++++++++
 .../test/java/org/apache/ratis/RaftTestUtil.java   | 14 +++++
 .../ratis/server/impl/LeaderElectionTests.java     | 38 ++++--------
 .../ratis/grpc/TestLeaderInstallSnapshot.java      |  6 ++
 7 files changed, 112 insertions(+), 29 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index cba3bba1c..7ec94076f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1875,6 +1875,7 @@ class RaftServerImpl implements RaftServer.Division,
    * @param logEntry the log entry being truncated
    */
   void notifyTruncatedLogEntry(LogEntryProto logEntry) {
+    Optional.ofNullable(getState()).ifPresent(s -> 
s.truncate(logEntry.getIndex()));
     if (logEntry.hasStateMachineLogEntry()) {
       getTransactionManager().remove(TermIndex.valueOf(logEntry));
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0f46c6b52..c49e9554f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -387,6 +387,10 @@ class ServerState {
     LOG.trace("{}: {}", getMemberId(), configurationManager);
   }
 
+  void truncate(long logIndex) {
+    configurationManager.removeConfigurations(logIndex);
+  }
+
   void updateConfiguration(List<LogEntryProto> entries) {
     if (entries != null && !entries.isEmpty()) {
       configurationManager.removeConfigurations(entries.get(0).getIndex());
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index f03e2d883..4a63e64ee 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -133,6 +133,7 @@ class SnapshotInstallationHandler {
       if (request.hasLastRaftConfigurationLogEntryProto()) {
         // Set the configuration included in the snapshot
         final LogEntryProto proto = 
request.getLastRaftConfigurationLogEntryProto();
+        state.truncate(proto.getIndex());
         if 
(!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
           LOG.info("{}: set new configuration {} from snapshot", 
getMemberId(), proto);
           state.setRaftConf(proto);
@@ -175,9 +176,10 @@ class SnapshotInstallationHandler {
         // Check and append the snapshot chunk. We simply put this in lock
         // considering a follower peer requiring a snapshot installation does 
not
         // have a lot of requests
-        Preconditions.assertTrue(state.getLog().getLastCommittedIndex() < 
lastIncludedIndex,
-            "%s log's commit index is %s, last included index in snapshot is 
%s",
-            getMemberId(), state.getLog().getLastCommittedIndex(), 
lastIncludedIndex);
+        if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
+          return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+              currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.ALREADY_INSTALLED);
+        }
 
         //TODO: We should only update State with installed snapshot once the 
request is done.
         state.installSnapshot(request);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index 46cfebbd1..b83a7dfdd 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -21,7 +21,10 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -85,6 +88,12 @@ public abstract class InstallSnapshotFromLeaderTests<CLUSTER 
extends MiniRaftClu
     runWithNewCluster(1, this::testMultiFileInstallSnapshot);
   }
 
+  public void testInstallSnapshotLeaderSwitch() throws Exception {
+    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        StateMachineWithSeparatedSnapshotPath.class, StateMachine.class);
+    runWithNewCluster(3, this::testInstallSnapshotDuringLeaderSwitch);
+  }
+
   private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
     try {
       int i = 0;
@@ -127,6 +136,67 @@ public abstract class 
InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu
     }
   }
 
+  private void testInstallSnapshotDuringLeaderSwitch(CLUSTER cluster) throws 
Exception {
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      // perform operations and force all peers to take snapshot
+      try (final RaftClient client = cluster.createClient(leaderId)) {
+        for (int i = 0; i < SNAPSHOT_TRIGGER_THRESHOLD * 2; i++) {
+          final RaftClientReply
+              reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + 
i));
+          Assertions.assertTrue(reply.isSuccess());
+        }
+
+        for (final RaftPeer peer: cluster.getPeers()) {
+          final RaftClientReply snapshotReply = 
client.getSnapshotManagementApi(leaderId).create(3000);
+          Assertions.assertTrue(snapshotReply.isSuccess());
+        }
+      }
+      final SnapshotInfo snapshot = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+      Assertions.assertNotNull(snapshot);
+
+      // isolate two followers (majority) in old configuration
+      final List<RaftServer.Division> oldFollowers = cluster.getFollowers();
+      for (RaftServer.Division f: oldFollowers) {
+        RaftTestUtil.isolate(cluster, f.getId());
+      }
+
+      // add two more peers and install snapshot from leaders
+      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+          true);
+      try (final RaftClient client = cluster.createClient(leaderId, 
RetryPolicies.noRetry())) {
+        Assertions.assertThrows(RaftRetryFailureException.class,
+                 () -> 
client.admin().setConfiguration(change.allPeersInNewConf));
+      }
+
+      final SnapshotInfo snapshotInfo = 
cluster.getDivision(change.newPeers[0].getId())
+           .getStateMachine().getLatestSnapshot();
+      Assertions.assertNotNull(snapshotInfo);
+
+      // recover the old followers and isolate the leader to force leader 
switch
+      RaftTestUtil.isolate(cluster, leaderId);
+      for (RaftServer.Division f: oldFollowers) {
+        RaftTestUtil.deIsolate(cluster, f.getId());
+      }
+      RaftTestUtil.waitForLeader(cluster);
+
+      try (final RaftClient client = 
cluster.createClient(cluster.getLeader().getId())) {
+        // successfully setConfiguration during leader switch
+        final RaftClientReply setConf = 
client.admin().setConfiguration(change.allPeersInNewConf);
+        Assertions.assertTrue(setConf.isSuccess());
+
+        RaftTestUtil.deIsolate(cluster, leaderId);
+        final RaftClientReply
+              reply = client.io().send(new 
RaftTestUtil.SimpleMessage("final"));
+        Assertions.assertTrue(reply.isSuccess());
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   private static class StateMachineWithMultiNestedSnapshotFile extends 
SimpleStateMachine4Testing {
 
     File snapshotRoot;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index f7a3f9a52..be8739ad8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -514,6 +514,20 @@ public interface RaftTestUtil {
     Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS));
   }
 
+  static void isolate(MiniRaftCluster cluster, RaftPeerId id) {
+    try {
+      BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
+      cluster.setBlockRequestsFrom(id.toString(), true);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  static void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
+    BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
+    cluster.setBlockRequestsFrom(id.toString(), false);
+  }
+
   static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId 
leaderId, SimpleMessage... messages) {
     Thread t = new Thread(() -> {
       try (final RaftClient client = cluster.createClient(leaderId)) {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 69791896a..ecb4a3dc0 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -122,12 +122,12 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     final TimeDuration maxTimeout = 
RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
     final RaftServer.Division leader = waitForLeader(cluster);
     try {
-      isolate(cluster, leader.getId());
+      RaftTestUtil.isolate(cluster, leader.getId());
       maxTimeout.sleep();
       maxTimeout.sleep();
       RaftServerTestUtil.assertLostMajorityHeartbeatsRecently(leader);
     } finally {
-      deIsolate(cluster, leader.getId());
+      RaftTestUtil.deIsolate(cluster, leader.getId());
     }
   }
 
@@ -164,12 +164,12 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
     final RaftServer.Division listener = cluster.getListeners().get(0);
     final RaftPeerId listenerId = listener.getId();
     try {
-      isolate(cluster, listenerId);
+      RaftTestUtil.isolate(cluster, listenerId);
       maxTimeout.sleep();
       maxTimeout.sleep();
       Assertions.assertEquals(RaftProtos.RaftPeerRole.LISTENER, 
listener.getInfo().getCurrentRole());
     } finally {
-      deIsolate(cluster, listener.getId());
+      RaftTestUtil.deIsolate(cluster, listener.getId());
     }
   }
 
@@ -247,7 +247,7 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         RaftServer.Division newLeader = followers.get(0);
 
         // isolate new leader, so that transfer leadership will timeout
-        isolate(cluster, newLeader.getId());
+        RaftTestUtil.isolate(cluster, newLeader.getId());
 
         List<RaftPeer> peers = cluster.getPeers();
 
@@ -287,7 +287,7 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         Assertions.assertEquals(leader.getId().toString(), 
reply.getReplierId());
         Assertions.assertTrue(reply.isSuccess());
 
-        deIsolate(cluster, newLeader.getId());
+        RaftTestUtil.deIsolate(cluster, newLeader.getId());
       }
 
       cluster.shutdown();
@@ -364,32 +364,18 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
       try (RaftClient client = cluster.createClient(leader.getId())) {
         client.io().send(new RaftTestUtil.SimpleMessage("message"));
         Thread.sleep(1000);
-        isolate(cluster, leader.getId());
+        RaftTestUtil.isolate(cluster, leader.getId());
         RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("message"));
         Assertions.assertNotEquals(reply.getReplierId(), 
leader.getId().toString());
         Assertions.assertTrue(reply.isSuccess());
       } finally {
-        deIsolate(cluster, leader.getId());
+        RaftTestUtil.deIsolate(cluster, leader.getId());
       }
 
       cluster.shutdown();
     }
   }
 
-  private void isolate(MiniRaftCluster cluster, RaftPeerId id) {
-    try {
-      BlockRequestHandlingInjection.getInstance().blockReplier(id.toString());
-      cluster.setBlockRequestsFrom(id.toString(), true);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-  private void deIsolate(MiniRaftCluster cluster, RaftPeerId id) {
-    BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
-    cluster.setBlockRequestsFrom(id.toString(), false);
-  }
-
   @Test
   public void testAddListener() throws Exception {
     try (final MiniRaftCluster cluster = newCluster(3)) {
@@ -571,7 +557,7 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         assertEquals(followers.size(), 2);
 
         RaftServer.Division follower = followers.get(0);
-        isolate(cluster, follower.getId());
+        RaftTestUtil.isolate(cluster, follower.getId());
         // send message so that the isolated follower's log lag the others
         RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("message"));
         Assertions.assertTrue(reply.isSuccess());
@@ -579,7 +565,7 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
         final long savedTerm = leader.getInfo().getCurrentTerm();
         LOG.info("Wait follower {} timeout and trigger pre-vote", 
follower.getId());
         Thread.sleep(2000);
-        deIsolate(cluster, follower.getId());
+        RaftTestUtil.deIsolate(cluster, follower.getId());
         Thread.sleep(2000);
         // with pre-vote leader will not step down
         RaftServer.Division newleader = waitForLeader(cluster);
@@ -670,14 +656,14 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
       Assertions.assertTrue(leader.getInfo().isLeaderReady());
       RaftServerTestUtil.assertLeaderLease(leader, true);
 
-      isolate(cluster, leader.getId());
+      RaftTestUtil.isolate(cluster, leader.getId());
       Thread.sleep(leaseTimeoutMs);
 
       Assertions.assertTrue(leader.getInfo().isLeader());
       Assertions.assertTrue(leader.getInfo().isLeaderReady());
       RaftServerTestUtil.assertLeaderLease(leader, false);
     } finally {
-      deIsolate(cluster, leader.getId());
+      RaftTestUtil.deIsolate(cluster, leader.getId());
     }
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
index 22c590c9d..b85cd1353 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
@@ -46,4 +46,10 @@ implements MiniRaftClusterWithGrpc.FactoryGet {
         super.testSeparateSnapshotInstallPath();
     }
 
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testInstallSnapshotLeaderSwitch(Boolean separateHeartbeat) 
throws Exception {
+        GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), 
separateHeartbeat);
+        super.testInstallSnapshotLeaderSwitch();
+    }
 }

Reply via email to