This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 371fbfe  RATIS-1305. Leader stuck in infinite install snapshot cycle 
when logs have been purged (#420).  Contributed by Ethan Rose
371fbfe is described below

commit 371fbfe36bee450fdac8cc9ecc0c91f1fd142859
Author: Ethan Rose <[email protected]>
AuthorDate: Thu Feb 11 19:00:21 2021 -0500

    RATIS-1305. Leader stuck in infinite install snapshot cycle when logs have 
been purged (#420).  Contributed by Ethan Rose
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  13 ++-
 .../ratis/InstallSnapshotNotificationTests.java    | 103 ++++++++++++++++++++-
 2 files changed, 113 insertions(+), 3 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1c8e4ea..3f2ef99 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -562,8 +562,16 @@ public class GrpcLogAppender extends LogAppenderBase {
    * @return the first available log's start term index
    */
   private TermIndex shouldNotifyToInstallSnapshot() {
+    final long followerNextIndex = getFollower().getNextIndex();
+    final long leaderNextIndex = getRaftLog().getNextIndex();
+
+    if (followerNextIndex >= leaderNextIndex) {
+      return null;
+    }
+
     final long leaderStartIndex = getRaftLog().getStartIndex();
-    if (getFollower().getNextIndex() < leaderStartIndex) {
+
+    if (followerNextIndex < leaderStartIndex) {
       // The Leader does not have the logs from the Follower's last log
       // index onwards. And install snapshot is disabled. So the Follower
       // should be notified to install the latest snapshot through its
@@ -572,8 +580,9 @@ public class GrpcLogAppender extends LogAppenderBase {
     } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
       // Leader has no logs to check from, hence return next index.
       return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(),
-          getRaftLog().getNextIndex());
+          leaderNextIndex);
     }
+
     return null;
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index c6bedea..a4c25da 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -30,7 +30,6 @@ import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
-import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -51,6 +50,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 public abstract class InstallSnapshotNotificationTests<CLUSTER extends 
MiniRaftCluster>
@@ -79,11 +79,16 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
   private static final int PURGE_GAP = 8;
   private static final AtomicReference<SnapshotInfo> leaderSnapshotInfoRef = 
new AtomicReference<>();
 
+  private static final AtomicInteger numSnapshotRequests = new AtomicInteger();
+
   private static class StateMachine4InstallSnapshotNotificationTests extends 
SimpleStateMachine4Testing {
     @Override
     public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
         RaftProtos.RoleInfoProto roleInfoProto,
         TermIndex termIndex) {
+
+      numSnapshotRequests.incrementAndGet();
+
       final SingleFileSnapshotInfo leaderSnapshotInfo = 
(SingleFileSnapshotInfo) leaderSnapshotInfoRef.get();
       LOG.info("{}: leaderSnapshotInfo = {}", getId(), leaderSnapshotInfo);
       if (leaderSnapshotInfo == null) {
@@ -236,6 +241,102 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       Assert.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
       Assert.assertEquals(newLeaderNextIndex, 
follower.getRaftLog().getNextIndex());
     }, 10, ONE_SECOND, "followerNextIndex", LOG);
+  }
+
+  @Test
+  public void testInstallSnapshotNotificationCount() throws Exception {
+    runWithNewCluster(3, this::testInstallSnapshotNotificationCount);
+  }
+
+
+  private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws 
Exception {
+    leaderSnapshotInfoRef.set(null);
+    numSnapshotRequests.set(0);
+
+    int i = 0;
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
 
+      // Let a few heartbeats pass.
+      ONE_SECOND.sleep();
+      Assert.assertEquals(0, numSnapshotRequests.get());
+
+      // Generate data.
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        for (; i < 10; i++) {
+          RaftClientReply
+              reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + 
i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+      }
+
+      // Take snapshot and check result.
+      long snapshotIndex = 
cluster.getLeader().getStateMachine().takeSnapshot();
+      Assert.assertEquals(20, snapshotIndex);
+      final SnapshotInfo leaderSnapshotInfo = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
+      Assert.assertEquals(20, leaderSnapshotInfo.getIndex());
+      final boolean set = leaderSnapshotInfoRef.compareAndSet(null, 
leaderSnapshotInfo);
+      Assert.assertTrue(set);
+
+      // Wait for the snapshot to be done.
+      final RaftServer.Division leader = cluster.getLeader();
+      final long nextIndex = leader.getRaftLog().getNextIndex();
+      Assert.assertEquals(21, nextIndex);
+      // End index is exclusive.
+      final List<File> snapshotFiles = 
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+          0, nextIndex);
+      JavaUtils.attemptRepeatedly(() -> {
+        
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+        return null;
+      }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+
+      // Clear all log files and reset cached log start index.
+      long snapshotInstallIndex =
+          
leader.getRaftLog().onSnapshotInstalled(leader.getRaftLog().getLastCommittedIndex()).get();
+      Assert.assertEquals(20, snapshotInstallIndex);
+
+      // Check that logs are gone.
+      Assert.assertEquals(0,
+          LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage()).size());
+      Assert.assertEquals(RaftLog.INVALID_LOG_INDEX, 
leader.getRaftLog().getStartIndex());
+
+      // Allow some heartbeats to go through, then make sure none of them had
+      // snapshot requests.
+      ONE_SECOND.sleep();
+      Assert.assertEquals(0, numSnapshotRequests.get());
+
+      // Make sure leader and followers are still up to date.
+      for (RaftServer.Division follower : cluster.getFollowers()) {
+        Assert.assertEquals(
+            leader.getRaftLog().getNextIndex(),
+            follower.getRaftLog().getNextIndex());
+      }
+
+      // Add two more peers who will need snapshots from the leader.
+      final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
+      // trigger setConfiguration
+      cluster.setConfiguration(change.allPeersInNewConf);
+      RaftServerTestUtil
+          .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+      // Generate more data.
+      try (final RaftClient client = cluster.createClient(leader.getId())) {
+        Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" 
+ i)).isSuccess());
+      }
+
+      // Make sure leader and followers are still up to date.
+      for (RaftServer.Division follower : cluster.getFollowers()) {
+        Assert.assertEquals(
+            leader.getRaftLog().getNextIndex(),
+            follower.getRaftLog().getNextIndex());
+      }
+
+      // Make sure each new peer got one snapshot notification.
+      Assert.assertEquals(2, numSnapshotRequests.get());
+
+    } finally {
+      cluster.shutdown();
+    }
   }
 }

Reply via email to