Repository: incubator-ratis
Updated Branches:
  refs/heads/master 9b2d7b65c -> 940a169ba


RATIS-347. Follow up works for RATIS-234.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/940a169b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/940a169b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/940a169b

Branch: refs/heads/master
Commit: 940a169bac12b10eb636d62029dddbc9fad420ff
Parents: 9b2d7b6
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Fri Oct 12 11:16:48 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Fri Oct 12 11:16:48 2018 +0800

----------------------------------------------------------------------
 .../apache/ratis/protocol/RaftClientReply.java  |   8 +-
 .../java/org/apache/ratis/util/LongMinMax.java  |  76 ++++++++
 .../java/org/apache/ratis/util/TestMinMax.java  |  57 ++++++
 .../apache/ratis/server/impl/LeaderState.java   |  28 +--
 .../org/apache/ratis/server/impl/RoleInfo.java  |  14 +-
 .../apache/ratis/server/storage/RaftLog.java    |   9 +-
 .../ratis/server/storage/RaftLogWorker.java     |   1 +
 .../org/apache/ratis/WatchRequestTests.java     | 178 +++++++++++++++----
 8 files changed, 317 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 4c290ff..0ec9f75 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.protocol;
 
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
@@ -25,8 +26,6 @@ import org.apache.ratis.util.ReflectionUtils;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
 
 /**
  * Reply from server to client
@@ -43,6 +42,11 @@ public class RaftClientReply extends RaftClientMessage {
   private final RaftException exception;
   private final Message message;
 
+  /**
+   * This field is the log index of the transaction
+   * if (1) the request is {@link RaftClientRequestProto.TypeCase#WRITE} and 
(2) the reply is success.
+   * Otherwise, this field is not used.
+   */
   private final long logIndex;
   /** The commit information when the reply is created. */
   private final Collection<CommitInfoProto> commitInfos;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java 
b/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java
new file mode 100644
index 0000000..47bc00d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LongMinMax.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+/**
+ * Min and max values in long.
+ *
+ * This class is mutable.
+ * This class is NOT thread safe.
+ */
+public class LongMinMax {
+  private long min;
+  private long max;
+  private boolean initialized = false;
+
+  /** @return the min */
+  public long getMin() {
+    Preconditions.assertTrue(initialized, "This LongMinMax object is 
uninitialized.");
+    return min;
+  }
+
+  /** @return the max */
+  public long getMax() {
+    Preconditions.assertTrue(initialized, "This LongMinMax object is 
uninitialized.");
+    return max;
+  }
+
+  public boolean isInitialized() {
+    return initialized;
+  }
+
+  /** Update min and max with the given number. */
+  public void accumulate(long n) {
+    if (!initialized) {
+      min = max = n;
+      initialized = true;
+    } else if (n < min) {
+      min = n;
+    } else if (n > max) {
+      max = n;
+    }
+  }
+
+  /** Combine that to this. */
+  public void combine(LongMinMax that) {
+    if (that.initialized) {
+      if (!this.initialized) {
+        this.min = that.min;
+        this.max = that.max;
+        this.initialized = true;
+      } else {
+        if (that.min < this.min) {
+          this.min = that.min;
+        }
+        if (that.max > this.max) {
+          this.max = that.max;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java 
b/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java
new file mode 100644
index 0000000..8d315b7
--- /dev/null
+++ b/ratis-common/src/test/java/org/apache/ratis/util/TestMinMax.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.LongStream;
+
+public class TestMinMax {
+  @Test(timeout = 1000)
+  public void testMinMax() {
+    runTestMinMax(LongStream.empty());
+    runTestMinMax(LongStream.iterate(0, n -> n).limit(10));
+    for(int count = 1; count < 10; count++) {
+      runTestMinMax(LongStream.iterate(1, n -> n + 1).limit(count));
+    }
+    for(int count = 1; count < 10; count++) {
+      runTestMinMax(LongStream.iterate(0, _dummy -> 
ThreadLocalRandom.current().nextLong()).limit(count));
+    }
+  }
+
+  static void runTestMinMax(LongStream stream) {
+    final List<Long> list = stream.collect(ArrayList::new, List::add, 
List::addAll);
+    final LongMinMax longMinMax = toLongStream(list).collect(LongMinMax::new, 
LongMinMax::accumulate, LongMinMax::combine);
+    if (longMinMax.isInitialized()) {
+      Assert.assertEquals(toLongStream(list).min().getAsLong(), 
longMinMax.getMin());
+      Assert.assertEquals(toLongStream(list).max().getAsLong(), 
longMinMax.getMax());
+    } else {
+      Assert.assertEquals(OptionalLong.empty(), toLongStream(list).min());
+      Assert.assertEquals(OptionalLong.empty(), toLongStream(list).max());
+    }
+  }
+
+  static LongStream toLongStream(List<Long> list) {
+    return list.stream().mapToLong(Long::longValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 48f0c1c..c647530 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -139,6 +139,10 @@ public class LeaderState {
       this.senders = new CopyOnWriteArrayList<>(senders);
     }
 
+    int size() {
+      return senders.size();
+    }
+
     Stream<LogAppender> stream() {
       return senders.stream();
     }
@@ -300,16 +304,15 @@ public class LeaderState {
   }
 
   void commitIndexChanged() {
-    final long leader = raftLog.getLastCommittedIndex();
-    final long min = senders.stream()
+    final LongMinMax minMax = senders.stream()
         .map(LogAppender::getFollower)
-        .map(FollowerInfo::getCommitIndex)
-        .min(Long::compare)
-        .orElse(leader); // it happens only if senders.isEmpty()
-    Preconditions.assertTrue(leader >= min); // leader commit index should 
always be ahead followers
-
-    watchRequests.update(ReplicationLevel.MAJORITY, leader);
-    watchRequests.update(ReplicationLevel.ALL_COMMITTED, min);
+        .mapToLong(FollowerInfo::getCommitIndex)
+        .collect(LongMinMax::new, LongMinMax::accumulate, LongMinMax::combine);
+    minMax.accumulate(raftLog.getLastCommittedIndex());
+    // Normally, leader commit index is always ahead followers.
+    // However, after a leader change, the new leader commit index may be 
behind some followers in the beginning.
+    watchRequests.update(ReplicationLevel.MAJORITY, minMax.getMax());
+    watchRequests.update(ReplicationLevel.ALL_COMMITTED, minMax.getMin());
   }
 
   private void applyOldNewConf() {
@@ -509,7 +512,7 @@ public class LeaderState {
       return;
     }
 
-    final long[] indicesInNewConf = computeCommittedIndices(followers, 
includeSelf);
+    final long[] indicesInNewConf = getSortedLogIndices(followers, 
includeSelf);
     final long majorityInNewConf = getMajority(indicesInNewConf);
     final long majority;
     final long min;
@@ -524,7 +527,7 @@ public class LeaderState {
         return;
       }
 
-      final long[] indicesInOldConf = computeCommittedIndices(oldFollowers, 
includeSelfInOldConf);
+      final long[] indicesInOldConf = getSortedLogIndices(oldFollowers, 
includeSelfInOldConf);
       final long majorityInOldConf = getMajority(indicesInOldConf);
       majority = Math.min(majorityInNewConf, majorityInOldConf);
       min = Math.min(indicesInNewConf[0], indicesInOldConf[0]);
@@ -537,6 +540,7 @@ public class LeaderState {
       final TermIndex[] entriesToCommit = raftLog.getEntries(
           oldLastCommitted + 1, majority + 1);
       if (server.getState().updateStatemachine(majority, currentTerm)) {
+        watchRequests.update(ReplicationLevel.MAJORITY, majority);
         commitIndexChanged();
       }
       checkAndUpdateConfiguration(entriesToCommit);
@@ -604,7 +608,7 @@ public class LeaderState {
     return indices[(indices.length - 1) / 2];
   }
 
-  private long[] computeCommittedIndices(List<FollowerInfo> followers, boolean 
includeSelf) {
+  private long[] getSortedLogIndices(List<FollowerInfo> followers, boolean 
includeSelf) {
     final int length = includeSelf ? followers.size() + 1 : followers.size();
     if (length == 0) {
       throw new IllegalArgumentException("followers.size() == "

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index eb25c71..42635f9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * Maintain the Role of a Raft Peer.
  */
-public class RoleInfo {
+class RoleInfo {
   public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class);
 
   private final RaftPeerId id;
@@ -53,28 +53,28 @@ public class RoleInfo {
     this.transitionTime = new AtomicReference<>(new Timestamp());
   }
 
-  public void transitionRole(RaftPeerRole newRole) {
+  void transitionRole(RaftPeerRole newRole) {
     this.role = newRole;
     this.transitionTime.set(new Timestamp());
   }
 
-  public long getRoleElapsedTimeMs() {
+  long getRoleElapsedTimeMs() {
     return transitionTime.get().elapsedTimeMs();
   }
 
-  public RaftPeerRole getCurrentRole() {
+  RaftPeerRole getCurrentRole() {
     return role;
   }
 
-  public boolean isFollower() {
+  boolean isFollower() {
     return role == RaftPeerRole.FOLLOWER;
   }
 
-  public boolean isCandidate() {
+  boolean isCandidate() {
     return role == RaftPeerRole.CANDIDATE;
   }
 
-  public boolean isLeader() {
+  boolean isLeader() {
     return role == RaftPeerRole.LEADER;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index b5a38fd..c1b05f6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -89,14 +89,17 @@ public abstract class RaftLog implements Closeable {
    */
   public boolean updateLastCommitted(long majorityIndex, long currentTerm) {
     try(AutoCloseableLock writeLock = writeLock()) {
-      if (lastCommitted.get() < majorityIndex) {
+      final long oldCommittedIndex = lastCommitted.get();
+      if (oldCommittedIndex < majorityIndex) {
         // Only update last committed index for current term. See §5.4.2 in
         // paper for details.
         final TermIndex entry = getTermIndex(majorityIndex);
         if (entry != null && entry.getTerm() == currentTerm) {
           final long commitIndex = Math.min(majorityIndex, 
getLatestFlushedIndex());
-          LOG.debug("{}: Updating lastCommitted to {}", selfId, commitIndex);
-          lastCommitted.set(commitIndex);
+          if (commitIndex > oldCommittedIndex) {
+            LOG.debug("{}: updateLastCommitted {} -> {}", selfId, 
oldCommittedIndex, commitIndex);
+            lastCommitted.set(commitIndex);
+          }
           return true;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 715370b..30df276 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -238,6 +238,7 @@ class RaftLogWorker implements Runnable {
   }
 
   private void updateFlushedIndex() {
+    LOG.debug("{}: updateFlushedIndex {} -> {}", name, lastWrittenIndex, 
flushedIndex);
     flushedIndex = lastWrittenIndex;
     pendingFlushNum = 0;
     submitUpdateCommitEvent.run();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/940a169b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 5014150..bb89bc0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -27,7 +27,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.CheckedFunction;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
@@ -66,49 +66,74 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     LOG.info("Running testWatchRequests");
     try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
       cluster.start();
-      runTestWatchRequestAsync(cluster, LOG);
+      runTest(WatchRequestTests::runTestWatchRequestAsync, cluster, LOG);
     }
   }
 
-  static void runTestWatchRequestAsync(MiniRaftCluster cluster, Logger LOG) 
throws Exception {
+  static class TestParameters {
+    final long startLogIndex;
+    final int numMessages;
+    final RaftClient writeClient;
+    final RaftClient watchMajorityClient;
+    final RaftClient watchAllClient;
+    final RaftClient watchAllCommittedClient;
+    final MiniRaftCluster cluster;
+    final Logger log;
+
+    TestParameters(long startLogIndex, int numMessages, RaftClient writeClient,
+        RaftClient watchMajorityClient, RaftClient watchAllClient, RaftClient 
watchAllCommittedClient,
+        MiniRaftCluster cluster, Logger log) {
+      this.startLogIndex = startLogIndex;
+      this.numMessages = numMessages;
+      this.writeClient = writeClient;
+      this.watchMajorityClient = watchMajorityClient;
+      this.watchAllClient = watchAllClient;
+      this.watchAllCommittedClient = watchAllCommittedClient;
+      this.cluster = cluster;
+      this.log = log;
+    }
+
+    @Override
+    public String toString() {
+      return "startLogIndex=" + startLogIndex + ", numMessages=" + numMessages;
+    }
+  }
+
+  static long getLogIndex(RaftClient writeClient) throws Exception {
+    // send a message in order to get the current log index
+    final RaftTestUtil.SimpleMessage message = new 
RaftTestUtil.SimpleMessage("getLogIndex");
+    final RaftClientReply reply = 
writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+    Assert.assertTrue(reply.isSuccess());
+    return reply.getLogIndex();
+  }
+
+  static void runTest(CheckedFunction<TestParameters, Void, Exception> 
testCase, MiniRaftCluster cluster, Logger LOG) throws Exception {
     try(final RaftClient writeClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchMajorityClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchAllClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchAllCommittedClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
-      long logIndex;
-      {
-        // send the first message
-        final RaftTestUtil.SimpleMessage message = new 
RaftTestUtil.SimpleMessage("message");
-        final RaftClientReply reply = 
writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-        Assert.assertTrue(reply.isSuccess());
-        logIndex = reply.getLogIndex();
-
-        final List<CompletableFuture<Void>> futures = new ArrayList<>();
-        futures.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY)
-            .thenAccept(r -> Assert.assertTrue(r.isSuccess())));
-        futures.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL)
-            .thenAccept(r -> Assert.assertTrue(r.isSuccess())));
-        futures.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED)
-            .thenAccept(r -> Assert.assertTrue(r.isSuccess())));
-        JavaUtils.allOf(futures).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      }
-      logIndex++;
-
+      final int[] numMessages = {1, 10, 100};
       for(int i = 0; i < 5; i++) {
-        final int numMessages = ThreadLocalRandom.current().nextInt(10) + 1;
-        runTestWatchRequestAsync(logIndex, numMessages, writeClient, 
watchMajorityClient, watchAllClient, watchAllCommittedClient, cluster, LOG);
-        logIndex += numMessages;
+        final long logIndex = getLogIndex(writeClient) + 1;
+        final int n = 
numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
+        final TestParameters p = new TestParameters(
+            logIndex, n, writeClient, watchMajorityClient, watchAllClient, 
watchAllCommittedClient, cluster, LOG);
+        LOG.info("{}) {}, {}", i, p, cluster.printServers());
+        testCase.apply(p);
       }
 
-      LOG.info(cluster.printServers());
     }
   }
 
+  static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
+    runTestWatchRequestAsync(p.startLogIndex, p.numMessages,
+        p.writeClient, p.watchMajorityClient, p.watchAllClient, 
p.watchAllCommittedClient, p.cluster, p.log);
+    return null;
+  }
+
   static void runTestWatchRequestAsync(long startLogIndex, int numMessages,
       RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient, RaftClient watchAllCommittedClient,
       MiniRaftCluster cluster, Logger LOG) throws Exception {
-    LOG.info("runTestWatchRequestAsync: startLogIndex={}, numMessages={}", 
startLogIndex, numMessages);
-
     // blockStartTransaction of the leader so that no transaction can be 
committed MAJORITY
     final RaftServerImpl leader = cluster.getLeader();
     LOG.info("block leader {}", leader.getId());
@@ -177,7 +202,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       Assert.assertTrue(watchAllReply.isSuccess());
 
       final RaftClientReply watchAllCommittedReply = 
watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      LOG.info("watchAllCommittedReply({}) = ", logIndex, 
watchAllCommittedReply);
+      LOG.info("watchAllCommittedReply({}) = {}", logIndex, 
watchAllCommittedReply);
       Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = 
watchAllCommittedReply.getCommitInfos();
@@ -188,6 +213,99 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
   }
 
   static <T> void assertNotDone(List<CompletableFuture<T>> futures) {
-    futures.forEach(f -> Assert.assertFalse(f.isDone()));
+    futures.forEach(f -> {
+      if (f.isDone()) {
+        try {
+          Assert.fail("Done unexpectedly: " + f.get());
+        } catch(Exception e) {
+          Assert.fail("Done unexpectedly and failed to get: " + e);
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testWatchRequestAsyncChangeLeader() throws Exception {
+    LOG.info("Running testWatchRequestAsyncChangeLeader");
+    try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
+      cluster.start();
+      runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, 
cluster, LOG);
+    }
+  }
+
+  static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws 
Exception {
+    runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages,
+        p.writeClient, p.watchMajorityClient, p.watchAllClient, 
p.watchAllCommittedClient, p.cluster, p.log);
+    return null;
+  }
+
+  static void runTestWatchRequestAsyncChangeLeader(long startLogIndex, int 
numMessages,
+      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient, RaftClient watchAllCommittedClient,
+      MiniRaftCluster cluster, Logger LOG) throws Exception {
+    // blockFlushStateMachineData a follower so that no transaction can be 
ALL_COMMITTED
+    final List<RaftServerImpl> followers = cluster.getFollowers();
+    final RaftServerImpl blockedFollower = 
followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
+    LOG.info("block follower {}", blockedFollower.getId());
+    
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
+
+    // send a message
+    final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajoritys = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAlls = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new 
ArrayList<>();
+
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      final String message = "m" + logIndex;
+      LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, 
message);
+      replies.add(writeClient.sendAsync(new 
RaftTestUtil.SimpleMessage(message)));
+      watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY));
+      watchAlls.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL));
+      watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED));
+    }
+
+    Assert.assertEquals(numMessages, replies.size());
+    Assert.assertEquals(numMessages, watchMajoritys.size());
+    Assert.assertEquals(numMessages, watchAlls.size());
+    Assert.assertEquals(numMessages, watchAllCommitteds.size());
+
+    // since only one follower is blocked, requests can be committed MAJORITY 
but neither ALL nor ALL_COMMITTED.
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      LOG.info("UNBLOCK_F1 {}: logIndex={}", i, logIndex);
+      final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(logIndex, reply.getLogIndex());
+      final RaftClientReply watchMajorityReply = 
watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
+      Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
+    }
+    TimeUnit.SECONDS.sleep(1);
+    assertNotDone(watchAlls);
+    assertNotDone(watchAllCommitteds);
+
+    // Now change leader
+    RaftTestUtil.changeLeader(cluster, cluster.getLeader().getId());
+
+    // unblock follower so that the transaction can be replicated and 
committed to all.
+    
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
+    LOG.info("unblock follower {}", blockedFollower.getId());
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
+      final RaftClientReply watchAllReply = 
watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
+      Assert.assertTrue(watchAllReply.isSuccess());
+
+      final RaftClientReply watchAllCommittedReply = 
watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      LOG.info("watchAllCommittedReply({}) = {}", logIndex, 
watchAllCommittedReply);
+      Assert.assertTrue(watchAllCommittedReply.isSuccess());
+      { // check commit infos
+        final Collection<CommitInfoProto> commitInfos = 
watchAllCommittedReply.getCommitInfos();
+        Assert.assertEquals(NUM_SERVERS, commitInfos.size());
+        commitInfos.forEach(info -> Assert.assertTrue(logIndex <= 
info.getCommitIndex()));
+      }
+    }
   }
+
 }

Reply via email to