Repository: incubator-ratis
Updated Branches:
  refs/heads/master ca0a1271d -> 4bd32cda3


RATIS-356. Add replication level for transactions committed by majority in 
watch api.  Contributed by Jitendra Nath Pandey


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4bd32cda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4bd32cda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4bd32cda

Branch: refs/heads/master
Commit: 4bd32cda3909dad92a618cbe7e194d62979978d5
Parents: ca0a127
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Fri Oct 19 12:18:57 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Fri Oct 19 12:18:57 2018 +0800

----------------------------------------------------------------------
 ratis-proto/src/main/proto/Raft.proto           | 10 ++-
 .../apache/ratis/server/impl/LeaderState.java   | 18 +++--
 .../org/apache/ratis/WatchRequestTests.java     | 76 +++++++++++++++++---
 3 files changed, 86 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4bd32cda/ratis-proto/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 6462573..0640954 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -174,9 +174,15 @@ enum ReplicationLevel {
   /** Committed at the leader and replicated to all peers.
        Note that ReplicationLevel.ALL implies ReplicationLevel.MAJORITY. */
   ALL = 1;
+
+  /** Committed at majority peers.
+      Note that ReplicationLevel.MAJORITY_COMMITTED implies 
ReplicationLevel.MAJORITY. */
+  MAJORITY_COMMITTED = 2;
+
   /** Committed at all peers.
-      Note that ReplicationLevel.ALL_COMMITTED implies ReplicationLevel.ALL. */
-  ALL_COMMITTED = 2;
+      Note that ReplicationLevel.ALL_COMMITTED implies ReplicationLevel.ALL
+      and ReplicationLevel.MAJORITY_COMMITTED */
+  ALL_COMMITTED = 3;
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4bd32cda/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index c647530..41cce99 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 import java.util.stream.Stream;
 
 /**
@@ -304,15 +305,18 @@ public class LeaderState {
   }
 
   void commitIndexChanged() {
-    final LongMinMax minMax = senders.stream()
+    final long[] commitIndices = LongStream.concat(LongStream.of(
+        raftLog.getLastCommittedIndex()), senders.stream()
         .map(LogAppender::getFollower)
-        .mapToLong(FollowerInfo::getCommitIndex)
-        .collect(LongMinMax::new, LongMinMax::accumulate, LongMinMax::combine);
-    minMax.accumulate(raftLog.getLastCommittedIndex());
+        .mapToLong(FollowerInfo::getCommitIndex))
+        .sorted().toArray();
+
     // Normally, leader commit index is always ahead followers.
-    // However, after a leader change, the new leader commit index may be 
behind some followers in the beginning.
-    watchRequests.update(ReplicationLevel.MAJORITY, minMax.getMax());
-    watchRequests.update(ReplicationLevel.ALL_COMMITTED, minMax.getMin());
+    // However, after a leader change, the new leader commit index may
+    // be behind some followers in the beginning.
+    watchRequests.update(ReplicationLevel.MAJORITY, 
commitIndices[commitIndices.length-1]);
+    watchRequests.update(ReplicationLevel.ALL_COMMITTED, commitIndices[0]);
+    watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, 
getMajority(commitIndices));
   }
 
   private void applyOldNewConf() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4bd32cda/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index bb89bc0..9ff27ad 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -76,18 +76,22 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     final RaftClient writeClient;
     final RaftClient watchMajorityClient;
     final RaftClient watchAllClient;
+    final RaftClient watchMajorityCommittedClient;
     final RaftClient watchAllCommittedClient;
     final MiniRaftCluster cluster;
     final Logger log;
 
-    TestParameters(long startLogIndex, int numMessages, RaftClient writeClient,
-        RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient 
watchAllCommittedClient,
+    TestParameters(
+        long startLogIndex, int numMessages, RaftClient writeClient,
+        RaftClient watchMajorityClient, RaftClient watchAllClient,
+        RaftClient watchMajorityCommittedClient, RaftClient 
watchAllCommittedClient,
         MiniRaftCluster cluster, Logger log) {
       this.startLogIndex = startLogIndex;
       this.numMessages = numMessages;
       this.writeClient = writeClient;
       this.watchMajorityClient = watchMajorityClient;
       this.watchAllClient = watchAllClient;
+      this.watchMajorityCommittedClient = watchMajorityCommittedClient;
       this.watchAllCommittedClient = watchAllCommittedClient;
       this.cluster = cluster;
       this.log = log;
@@ -111,13 +115,15 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     try(final RaftClient writeClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchMajorityClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchAllClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
+        final RaftClient watchMajorityCommittedClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchAllCommittedClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
       final int[] numMessages = {1, 10, 100};
       for(int i = 0; i < 5; i++) {
         final long logIndex = getLogIndex(writeClient) + 1;
         final int n = 
numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
         final TestParameters p = new TestParameters(
-            logIndex, n, writeClient, watchMajorityClient, watchAllClient, 
watchAllCommittedClient, cluster, LOG);
+            logIndex, n, writeClient, watchMajorityClient, watchAllClient,
+            watchMajorityCommittedClient, watchAllCommittedClient, cluster, 
LOG);
         LOG.info("{}) {}, {}", i, p, cluster.printServers());
         testCase.apply(p);
       }
@@ -127,12 +133,15 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
 
   static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
     runTestWatchRequestAsync(p.startLogIndex, p.numMessages,
-        p.writeClient, p.watchMajorityClient, p.watchAllClient, 
p.watchAllCommittedClient, p.cluster, p.log);
+        p.writeClient, p.watchMajorityClient, p.watchAllClient,
+        p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, 
p.log);
     return null;
   }
 
-  static void runTestWatchRequestAsync(long startLogIndex, int numMessages,
-      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient, RaftClient watchAllCommittedClient,
+  static void runTestWatchRequestAsync(
+      long startLogIndex, int numMessages,
+      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient,
+      RaftClient watchMajorityCommittedClient, RaftClient 
watchAllCommittedClient,
       MiniRaftCluster cluster, Logger LOG) throws Exception {
     // blockStartTransaction of the leader so that no transaction can be 
committed MAJORITY
     final RaftServerImpl leader = cluster.getLeader();
@@ -149,6 +158,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
     final List<CompletableFuture<RaftClientReply>> watchMajoritys = new 
ArrayList<>();
     final List<CompletableFuture<RaftClientReply>> watchAlls = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = 
new ArrayList<>();
     final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new 
ArrayList<>();
 
     for(int i = 0; i < numMessages; i++) {
@@ -158,12 +168,15 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       replies.add(writeClient.sendAsync(new 
RaftTestUtil.SimpleMessage(message)));
       watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY));
       watchAlls.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL));
+      watchMajorityCommitteds.add(watchMajorityCommittedClient.sendWatchAsync(
+          logIndex, ReplicationLevel.MAJORITY_COMMITTED));
       watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED));
     }
 
     Assert.assertEquals(numMessages, replies.size());
     Assert.assertEquals(numMessages, watchMajoritys.size());
     Assert.assertEquals(numMessages, watchAlls.size());
+    Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
     Assert.assertEquals(numMessages, watchAllCommitteds.size());
 
     // since leader is blocked, nothing can be done.
@@ -171,6 +184,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     assertNotDone(replies);
     assertNotDone(watchMajoritys);
     assertNotDone(watchAlls);
+    assertNotDone(watchMajorityCommitteds);
     assertNotDone(watchAllCommitteds);
 
     // unblock leader so that the transaction can be committed.
@@ -185,7 +199,26 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       final RaftClientReply watchMajorityReply = 
watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
       Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+
+      final RaftClientReply watchMajorityCommittedReply
+          = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+      LOG.info("watchMajorityCommittedReply({}) = ", logIndex, 
watchMajorityCommittedReply);
+      Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
+      { // check commit infos
+        final Collection<CommitInfoProto> commitInfos = 
watchMajorityCommittedReply.getCommitInfos();
+        Assert.assertEquals(NUM_SERVERS, commitInfos.size());
+
+        // One follower has not committed, so min must be less than logIndex
+        Assert.assertTrue(commitInfos.stream()
+            .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < 
logIndex);
+
+        // All other followers have committed
+        commitInfos.stream()
+            .map(CommitInfoProto::getCommitIndex).sorted(Long::compare)
+            .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci));
+      }
     }
+
     // but not replicated/committed to all.
     TimeUnit.SECONDS.sleep(1);
     assertNotDone(watchAlls);
@@ -235,12 +268,15 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
 
   static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws 
Exception {
     runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages,
-        p.writeClient, p.watchMajorityClient, p.watchAllClient, 
p.watchAllCommittedClient, p.cluster, p.log);
+        p.writeClient, p.watchMajorityClient, p.watchAllClient,
+        p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, 
p.log);
     return null;
   }
 
-  static void runTestWatchRequestAsyncChangeLeader(long startLogIndex, int 
numMessages,
-      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient, RaftClient watchAllCommittedClient,
+  static void runTestWatchRequestAsyncChangeLeader(
+      long startLogIndex, int numMessages,
+      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient,
+      RaftClient watchMajorityCommittedClient, RaftClient 
watchAllCommittedClient,
       MiniRaftCluster cluster, Logger LOG) throws Exception {
     // blockFlushStateMachineData a follower so that no transaction can be 
ALL_COMMITTED
     final List<RaftServerImpl> followers = cluster.getFollowers();
@@ -252,6 +288,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
     final List<CompletableFuture<RaftClientReply>> watchMajoritys = new 
ArrayList<>();
     final List<CompletableFuture<RaftClientReply>> watchAlls = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = 
new ArrayList<>();
     final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new 
ArrayList<>();
 
     for(int i = 0; i < numMessages; i++) {
@@ -261,12 +298,15 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       replies.add(writeClient.sendAsync(new 
RaftTestUtil.SimpleMessage(message)));
       watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY));
       watchAlls.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL));
+      watchMajorityCommitteds.add(
+          watchMajorityCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY_COMMITTED));
       watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED));
     }
 
     Assert.assertEquals(numMessages, replies.size());
     Assert.assertEquals(numMessages, watchMajoritys.size());
     Assert.assertEquals(numMessages, watchAlls.size());
+    Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
     Assert.assertEquals(numMessages, watchAllCommitteds.size());
 
     // since only one follower is blocked, requests can be committed MAJORITY 
but neither ALL nor ALL_COMMITTED.
@@ -279,6 +319,24 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       final RaftClientReply watchMajorityReply = 
watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
       Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+
+      final RaftClientReply watchMajorityCommittedReply
+          = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+      LOG.info("watchMajorityCommittedReply({}) = ", logIndex, 
watchMajorityCommittedReply);
+      Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
+      { // check commit infos
+        final Collection<CommitInfoProto> commitInfos = 
watchMajorityCommittedReply.getCommitInfos();
+        Assert.assertEquals(NUM_SERVERS, commitInfos.size());
+
+        // One follower has not committed, so min must be less than logIndex
+        Assert.assertTrue(commitInfos.stream()
+            .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < 
logIndex);
+
+        // All other followers have committed
+        commitInfos.stream()
+            .map(CommitInfoProto::getCommitIndex).sorted(Long::compare)
+            .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci));
+      }
     }
     TimeUnit.SECONDS.sleep(1);
     assertNotDone(watchAlls);

Reply via email to