This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 371fbfe RATIS-1305. Leader stuck in infinite install snapshot cycle
when logs have been purged (#420). Contributed by Ethan Rose
371fbfe is described below
commit 371fbfe36bee450fdac8cc9ecc0c91f1fd142859
Author: Ethan Rose <[email protected]>
AuthorDate: Thu Feb 11 19:00:21 2021 -0500
RATIS-1305. Leader stuck in infinite install snapshot cycle when logs have
been purged (#420). Contributed by Ethan Rose
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 13 ++-
.../ratis/InstallSnapshotNotificationTests.java | 103 ++++++++++++++++++++-
2 files changed, 113 insertions(+), 3 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1c8e4ea..3f2ef99 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -562,8 +562,16 @@ public class GrpcLogAppender extends LogAppenderBase {
* @return the first available log's start term index
*/
private TermIndex shouldNotifyToInstallSnapshot() {
+ final long followerNextIndex = getFollower().getNextIndex();
+ final long leaderNextIndex = getRaftLog().getNextIndex();
+
+ if (followerNextIndex >= leaderNextIndex) {
+ return null;
+ }
+
final long leaderStartIndex = getRaftLog().getStartIndex();
- if (getFollower().getNextIndex() < leaderStartIndex) {
+
+ if (followerNextIndex < leaderStartIndex) {
// The Leader does not have the logs from the Follower's last log
// index onwards. And install snapshot is disabled. So the Follower
// should be notified to install the latest snapshot through its
@@ -572,8 +580,9 @@ public class GrpcLogAppender extends LogAppenderBase {
} else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
// Leader has no logs to check from, hence return next index.
return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(),
- getRaftLog().getNextIndex());
+ leaderNextIndex);
}
+
return null;
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index c6bedea..a4c25da 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -30,7 +30,6 @@ import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
-import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -51,6 +50,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public abstract class InstallSnapshotNotificationTests<CLUSTER extends
MiniRaftCluster>
@@ -79,11 +79,16 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
private static final int PURGE_GAP = 8;
private static final AtomicReference<SnapshotInfo> leaderSnapshotInfoRef =
new AtomicReference<>();
+ private static final AtomicInteger numSnapshotRequests = new AtomicInteger();
+
private static class StateMachine4InstallSnapshotNotificationTests extends
SimpleStateMachine4Testing {
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto,
TermIndex termIndex) {
+
+ numSnapshotRequests.incrementAndGet();
+
final SingleFileSnapshotInfo leaderSnapshotInfo =
(SingleFileSnapshotInfo) leaderSnapshotInfoRef.get();
LOG.info("{}: leaderSnapshotInfo = {}", getId(), leaderSnapshotInfo);
if (leaderSnapshotInfo == null) {
@@ -236,6 +241,102 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
Assert.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
Assert.assertEquals(newLeaderNextIndex,
follower.getRaftLog().getNextIndex());
}, 10, ONE_SECOND, "followerNextIndex", LOG);
+ }
+
+ @Test
+ public void testInstallSnapshotNotificationCount() throws Exception {
+ runWithNewCluster(3, this::testInstallSnapshotNotificationCount);
+ }
+
+
+ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws
Exception {
+ leaderSnapshotInfoRef.set(null);
+ numSnapshotRequests.set(0);
+
+ int i = 0;
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+ // Let a few heartbeats pass.
+ ONE_SECOND.sleep();
+ Assert.assertEquals(0, numSnapshotRequests.get());
+
+ // Generate data.
+ try(final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < 10; i++) {
+ RaftClientReply
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("m" +
i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+ }
+
+ // Take snapshot and check result.
+ long snapshotIndex =
cluster.getLeader().getStateMachine().takeSnapshot();
+ Assert.assertEquals(20, snapshotIndex);
+ final SnapshotInfo leaderSnapshotInfo =
cluster.getLeader().getStateMachine().getLatestSnapshot();
+ Assert.assertEquals(20, leaderSnapshotInfo.getIndex());
+ final boolean set = leaderSnapshotInfoRef.compareAndSet(null,
leaderSnapshotInfo);
+ Assert.assertTrue(set);
+
+ // Wait for the snapshot to be done.
+ final RaftServer.Division leader = cluster.getLeader();
+ final long nextIndex = leader.getRaftLog().getNextIndex();
+ Assert.assertEquals(21, nextIndex);
+ // End index is exclusive.
+ final List<File> snapshotFiles =
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
+ 0, nextIndex);
+ JavaUtils.attemptRepeatedly(() -> {
+
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
+ return null;
+ }, 10, ONE_SECOND, "snapshotFile.exist", LOG);
+
+ // Clear all log files and reset cached log start index.
+ long snapshotInstallIndex =
+
leader.getRaftLog().onSnapshotInstalled(leader.getRaftLog().getLastCommittedIndex()).get();
+ Assert.assertEquals(20, snapshotInstallIndex);
+
+ // Check that logs are gone.
+ Assert.assertEquals(0,
+ LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage()).size());
+ Assert.assertEquals(RaftLog.INVALID_LOG_INDEX,
leader.getRaftLog().getStartIndex());
+
+ // Allow some heartbeats to go through, then make sure none of them had
+ // snapshot requests.
+ ONE_SECOND.sleep();
+ Assert.assertEquals(0, numSnapshotRequests.get());
+
+ // Make sure leader and followers are still up to date.
+ for (RaftServer.Division follower : cluster.getFollowers()) {
+ Assert.assertEquals(
+ leader.getRaftLog().getNextIndex(),
+ follower.getRaftLog().getNextIndex());
+ }
+
+ // Add two more peers who will need snapshots from the leader.
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
+ // trigger setConfiguration
+ cluster.setConfiguration(change.allPeersInNewConf);
+ RaftServerTestUtil
+ .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+ // Generate more data.
+ try (final RaftClient client = cluster.createClient(leader.getId())) {
+ Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m"
+ i)).isSuccess());
+ }
+
+ // Make sure leader and followers are still up to date.
+ for (RaftServer.Division follower : cluster.getFollowers()) {
+ Assert.assertEquals(
+ leader.getRaftLog().getNextIndex(),
+ follower.getRaftLog().getNextIndex());
+ }
+
+ // Make sure each new peer got one snapshot notification.
+ Assert.assertEquals(2, numSnapshotRequests.get());
+
+ } finally {
+ cluster.shutdown();
+ }
}
}