Repository: incubator-ratis Updated Branches: refs/heads/master ca0a1271d -> 4bd32cda3
RATIS-356. Add replication level for transactions committed by majority in watch api. Contributed by Jitendra Nath Pandey Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4bd32cda Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4bd32cda Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4bd32cda Branch: refs/heads/master Commit: 4bd32cda3909dad92a618cbe7e194d62979978d5 Parents: ca0a127 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Oct 19 12:18:57 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Oct 19 12:18:57 2018 +0800 ---------------------------------------------------------------------- ratis-proto/src/main/proto/Raft.proto | 10 ++- .../apache/ratis/server/impl/LeaderState.java | 18 +++-- .../org/apache/ratis/WatchRequestTests.java | 76 +++++++++++++++++--- 3 files changed, 86 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4bd32cda/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 6462573..0640954 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -174,9 +174,15 @@ enum ReplicationLevel { /** Committed at the leader and replicated to all peers. Note that ReplicationLevel.ALL implies ReplicationLevel.MAJORITY. */ ALL = 1; + + /** Committed at majority peers. + Note that ReplicationLevel.MAJORITY_COMMITTED implies ReplicationLevel.MAJORITY. */ + MAJORITY_COMMITTED = 2; + /** Committed at all peers. - Note that ReplicationLevel.ALL_COMMITTED implies ReplicationLevel.ALL. */ - ALL_COMMITTED = 2; + Note that ReplicationLevel.ALL_COMMITTED implies ReplicationLevel.ALL + and ReplicationLevel.MAJORITY_COMMITTED */ + ALL_COMMITTED = 3; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4bd32cda/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index c647530..41cce99 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.LongStream; import java.util.stream.Stream; /** @@ -304,15 +305,18 @@ public class LeaderState { } void commitIndexChanged() { - final LongMinMax minMax = senders.stream() + final long[] commitIndices = LongStream.concat(LongStream.of( + raftLog.getLastCommittedIndex()), senders.stream() .map(LogAppender::getFollower) - .mapToLong(FollowerInfo::getCommitIndex) - .collect(LongMinMax::new, LongMinMax::accumulate, LongMinMax::combine); - minMax.accumulate(raftLog.getLastCommittedIndex()); + .mapToLong(FollowerInfo::getCommitIndex)) + .sorted().toArray(); + // Normally, leader commit index is always ahead followers. - // However, after a leader change, the new leader commit index may be behind some followers in the beginning. - watchRequests.update(ReplicationLevel.MAJORITY, minMax.getMax()); - watchRequests.update(ReplicationLevel.ALL_COMMITTED, minMax.getMin()); + // However, after a leader change, the new leader commit index may + // be behind some followers in the beginning. + watchRequests.update(ReplicationLevel.MAJORITY, commitIndices[commitIndices.length-1]); + watchRequests.update(ReplicationLevel.ALL_COMMITTED, commitIndices[0]); + watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, getMajority(commitIndices)); } private void applyOldNewConf() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4bd32cda/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index bb89bc0..9ff27ad 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -76,18 +76,22 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final RaftClient writeClient; final RaftClient watchMajorityClient; final RaftClient watchAllClient; + final RaftClient watchMajorityCommittedClient; final RaftClient watchAllCommittedClient; final MiniRaftCluster cluster; final Logger log; - TestParameters(long startLogIndex, int numMessages, RaftClient writeClient, - RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchAllCommittedClient, + TestParameters( + long startLogIndex, int numMessages, RaftClient writeClient, + RaftClient watchMajorityClient, RaftClient watchAllClient, + RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, MiniRaftCluster cluster, Logger log) { this.startLogIndex = startLogIndex; this.numMessages = numMessages; this.writeClient = writeClient; this.watchMajorityClient = watchMajorityClient; this.watchAllClient = watchAllClient; + this.watchMajorityCommittedClient = watchMajorityCommittedClient; this.watchAllCommittedClient = watchAllCommittedClient; this.cluster = cluster; this.log = log; @@ -111,13 +115,15 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchAllClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); + final RaftClient watchMajorityCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) { final int[] numMessages = {1, 10, 100}; for(int i = 0; i < 5; i++) { final long logIndex = getLogIndex(writeClient) + 1; final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)]; final TestParameters p = new TestParameters( - logIndex, n, writeClient, watchMajorityClient, watchAllClient, watchAllCommittedClient, cluster, LOG); + logIndex, n, writeClient, watchMajorityClient, watchAllClient, + watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG); LOG.info("{}) {}, {}", i, p, cluster.printServers()); testCase.apply(p); } @@ -127,12 +133,15 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> static Void runTestWatchRequestAsync(TestParameters p) throws Exception { runTestWatchRequestAsync(p.startLogIndex, p.numMessages, - p.writeClient, p.watchMajorityClient, p.watchAllClient, p.watchAllCommittedClient, p.cluster, p.log); + p.writeClient, p.watchMajorityClient, p.watchAllClient, + p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log); return null; } - static void runTestWatchRequestAsync(long startLogIndex, int numMessages, - RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchAllCommittedClient, + static void runTestWatchRequestAsync( + long startLogIndex, int numMessages, + RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, + RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, MiniRaftCluster cluster, Logger LOG) throws Exception { // blockStartTransaction of the leader so that no transaction can be committed MAJORITY final RaftServerImpl leader = cluster.getLeader(); @@ -149,6 +158,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>(); final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); for(int i = 0; i < numMessages; i++) { @@ -158,12 +168,15 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); + watchMajorityCommitteds.add(watchMajorityCommittedClient.sendWatchAsync( + logIndex, ReplicationLevel.MAJORITY_COMMITTED)); watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); } Assert.assertEquals(numMessages, replies.size()); Assert.assertEquals(numMessages, watchMajoritys.size()); Assert.assertEquals(numMessages, watchAlls.size()); + Assert.assertEquals(numMessages, watchMajorityCommitteds.size()); Assert.assertEquals(numMessages, watchAllCommitteds.size()); // since leader is blocked, nothing can be done. @@ -171,6 +184,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> assertNotDone(replies); assertNotDone(watchMajoritys); assertNotDone(watchAlls); + assertNotDone(watchMajorityCommitteds); assertNotDone(watchAllCommitteds); // unblock leader so that the transaction can be committed. @@ -185,7 +199,26 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); + + final RaftClientReply watchMajorityCommittedReply + = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); + Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); + { // check commit infos + final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); + Assert.assertEquals(NUM_SERVERS, commitInfos.size()); + + // One follower has not committed, so min must be less than logIndex + Assert.assertTrue(commitInfos.stream() + .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < logIndex); + + // All other followers have committed + commitInfos.stream() + .map(CommitInfoProto::getCommitIndex).sorted(Long::compare) + .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci)); + } } + // but not replicated/committed to all. TimeUnit.SECONDS.sleep(1); assertNotDone(watchAlls); @@ -235,12 +268,15 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception { runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages, - p.writeClient, p.watchMajorityClient, p.watchAllClient, p.watchAllCommittedClient, p.cluster, p.log); + p.writeClient, p.watchMajorityClient, p.watchAllClient, + p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, p.log); return null; } - static void runTestWatchRequestAsyncChangeLeader(long startLogIndex, int numMessages, - RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient watchAllCommittedClient, + static void runTestWatchRequestAsyncChangeLeader( + long startLogIndex, int numMessages, + RaftClient writeClient, RaftClient watchMajorityClient, RaftClient watchAllClient, + RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient, MiniRaftCluster cluster, Logger LOG) throws Exception { // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED final List<RaftServerImpl> followers = cluster.getFollowers(); @@ -252,6 +288,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); final List<CompletableFuture<RaftClientReply>> watchMajoritys = new ArrayList<>(); final List<CompletableFuture<RaftClientReply>> watchAlls = new ArrayList<>(); + final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = new ArrayList<>(); final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new ArrayList<>(); for(int i = 0; i < numMessages; i++) { @@ -261,12 +298,15 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> replies.add(writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message))); watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY)); watchAlls.add(watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL)); + watchMajorityCommitteds.add( + watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED)); watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED)); } Assert.assertEquals(numMessages, replies.size()); Assert.assertEquals(numMessages, watchMajoritys.size()); Assert.assertEquals(numMessages, watchAlls.size()); + Assert.assertEquals(numMessages, watchMajorityCommitteds.size()); Assert.assertEquals(numMessages, watchAllCommitteds.size()); // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED. @@ -279,6 +319,24 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final RaftClientReply watchMajorityReply = watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); Assert.assertTrue(watchMajoritys.get(i).get().isSuccess()); + + final RaftClientReply watchMajorityCommittedReply + = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); + Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); + { // check commit infos + final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); + Assert.assertEquals(NUM_SERVERS, commitInfos.size()); + + // One follower has not committed, so min must be less than logIndex + Assert.assertTrue(commitInfos.stream() + .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < logIndex); + + // All other followers have committed + commitInfos.stream() + .map(CommitInfoProto::getCommitIndex).sorted(Long::compare) + .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci)); + } } TimeUnit.SECONDS.sleep(1); assertNotDone(watchAlls);
