Repository: incubator-ratis
Updated Branches:
  refs/heads/master 2fb45da94 -> 96a3b1bfe


RATIS-409. Watch requests may not work if there is a conf change.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/96a3b1bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/96a3b1bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/96a3b1bf

Branch: refs/heads/master
Commit: 96a3b1bfeda319e8543da3dbcd2afadd5468a803
Parents: 2fb45da
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Sat Nov 17 15:52:52 2018 -0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Sat Nov 17 15:52:52 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/JavaUtils.java   |   5 +
 .../ratis/grpc/server/GrpcLogAppender.java      |  18 ++-
 .../grpc/server/GrpcServerProtocolService.java  |   4 +
 .../apache/ratis/server/impl/FollowerInfo.java  |  65 ++++++----
 .../apache/ratis/server/impl/LeaderState.java   |  92 ++++++++-----
 .../apache/ratis/server/impl/LogAppender.java   |   5 +-
 .../ratis/server/impl/RaftServerImpl.java       |   6 +-
 .../ratis/server/storage/RaftLogIndex.java      |  76 +++++++++++
 .../java/org/apache/ratis/MiniRaftCluster.java  |  15 +++
 .../org/apache/ratis/WatchRequestTests.java     | 130 ++++++-------------
 .../ratis/server/impl/RaftServerTestUtil.java   |   6 +
 11 files changed, 262 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 95fcf35..f3b0a0d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -67,6 +67,11 @@ public interface JavaUtils {
     return t;
   }
 
+  static StackTraceElement getCallerStackTraceElement() {
+    final StackTraceElement[] trace = Thread.currentThread().getStackTrace();
+    return trace[3];
+  }
+
   /**
    * Invoke {@link Callable#call()} and, if there any,
    * wrap the checked exception by {@link RuntimeException}.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 11ff4d8..ed96468 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -206,10 +206,19 @@ public class GrpcLogAppender extends LogAppender {
      */
     @Override
     public void onNext(AppendEntriesReplyProto reply) {
-      LOG.debug("{} received {} response from {}", server.getId(),
-          (!firstResponseReceived ? "the first" : "a"),
-          follower.getPeer());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}<-{}: received {} reply {} ", server.getId(), 
follower.getPeer(),
+            (!firstResponseReceived? "the first": "a"), 
ServerProtoUtils.toString(reply));
+      }
+
+      try {
+        onNextImpl(reply);
+      } catch(Throwable t) {
+        LOG.error("Failed onNext " + reply, t);
+      }
+    }
 
+    private void onNextImpl(AppendEntriesReplyProto reply) {
       // update the last rpc time
       follower.updateLastRpcResponseTime();
 
@@ -428,8 +437,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     if (responseHandler.hasAllResponse()) {
-      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
-      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
+      follower.setSnapshotIndex(snapshot.getTermIndex().getIndex());
       LOG.info("{}: install snapshot-{} successfully on follower {}",
           server.getId(), snapshot.getTermIndex().getIndex(), 
follower.getPeer());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 12a717a..5c5bd66 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.*;
 import 
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
@@ -76,6 +77,9 @@ public class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase
           server.appendEntriesAsync(request).thenCombine(previous,
               (reply, v) -> {
             if (!isClosed.get()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(server.getId() + ": reply " + 
ServerProtoUtils.toString(reply));
+              }
               responseObserver.onNext(reply);
             }
             current.complete(null);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 254319a..94bd7c4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -18,71 +18,84 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.storage.RaftLogIndex;
 import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 public class FollowerInfo {
+  public static final Logger LOG = LoggerFactory.getLogger(FollowerInfo.class);
+
+  private final String name;
+  private final Consumer<Object> infoIndexChange;
+  private final Consumer<Object> debugIndexChange;
+
   private final RaftPeer peer;
   private final AtomicReference<Timestamp> lastRpcResponseTime;
   private final AtomicReference<Timestamp> lastRpcSendTime;
-  private long nextIndex;
-  private final AtomicLong matchIndex;
-  private final AtomicLong commitIndex = new 
AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
+  private final RaftLogIndex nextIndex;
+  private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
+  private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", 
RaftServerConstants.INVALID_LOG_INDEX);
   private volatile boolean attendVote;
   private final int rpcSlownessTimeoutMs;
 
-  FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
+
+  FollowerInfo(RaftPeerId id, RaftPeer peer, Timestamp lastRpcTime, long 
nextIndex,
       boolean attendVote, int rpcSlownessTimeoutMs) {
+    this.name = id + "->" + peer.getId();
+    this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
+    this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
+
     this.peer = peer;
     this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
     this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
-    this.nextIndex = nextIndex;
-    this.matchIndex = new AtomicLong(0);
+    this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
     this.attendVote = attendVote;
     this.rpcSlownessTimeoutMs = rpcSlownessTimeoutMs;
   }
 
-  public void updateMatchIndex(final long matchIndex) {
-    this.matchIndex.set(matchIndex);
-  }
-
   public long getMatchIndex() {
     return matchIndex.get();
   }
 
+  public void updateMatchIndex(long newMatchIndex) {
+    matchIndex.updateIncreasingly(newMatchIndex, debugIndexChange);
+  }
+
   /** @return the commit index acked by the follower. */
   long getCommitIndex() {
     return commitIndex.get();
   }
 
   boolean updateCommitIndex(long newCommitIndex) {
-    final long old = commitIndex.getAndUpdate(oldCommitIndex -> 
newCommitIndex);
-    Preconditions.assertTrue(newCommitIndex >= old,
-        () -> "newCommitIndex = " + newCommitIndex + " < old = " + old);
-    return old != newCommitIndex;
+    return commitIndex.updateToMax(newCommitIndex, debugIndexChange);
+  }
+
+  public long getNextIndex() {
+    return nextIndex.get();
   }
 
-  public synchronized long getNextIndex() {
-    return nextIndex;
+  public void updateNextIndex(long newNextIndex) {
+    nextIndex.updateIncreasingly(newNextIndex, debugIndexChange);
   }
 
-  public synchronized void updateNextIndex(long i) {
-    nextIndex = i;
+  public void decreaseNextIndex(long newNextIndex) {
+    nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, 
newNextIndex), infoIndexChange);
   }
 
-  public synchronized void decreaseNextIndex(long targetIndex) {
-    if (nextIndex > 0) {
-      nextIndex = Math.min(nextIndex - 1, targetIndex);
-    }
+  public void setSnapshotIndex(long snapshotIndex) {
+    matchIndex.setUnconditionally(snapshotIndex, infoIndexChange);
+    nextIndex.setUnconditionally(snapshotIndex + 1, infoIndexChange);
   }
 
   @Override
   public String toString() {
-    return peer.getId() + "(next=" + nextIndex + ", match=" + matchIndex + "," 
+
-        " attendVote=" + attendVote +
+    return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" + 
getNextIndex()
+        + ", attendVote=" + attendVote +
         ", lastRpcSendTime=" + lastRpcSendTime.get().elapsedTimeMs() +
         ", lastRpcResponseTime=" + lastRpcResponseTime.get().elapsedTimeMs() + 
")";
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index c4c4fe2..46c3ac1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -38,9 +38,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
 import java.util.function.Predicate;
+import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 import java.util.stream.Stream;
 
 /**
@@ -302,18 +303,14 @@ public class LeaderState {
   }
 
   void commitIndexChanged() {
-    final long[] commitIndices = LongStream.concat(LongStream.of(
-        raftLog.getLastCommittedIndex()), senders.stream()
-        .map(LogAppender::getFollower)
-        .mapToLong(FollowerInfo::getCommitIndex))
-        .sorted().toArray();
-
-    // Normally, leader commit index is always ahead followers.
-    // However, after a leader change, the new leader commit index may
-    // be behind some followers in the beginning.
-    watchRequests.update(ReplicationLevel.MAJORITY, 
commitIndices[commitIndices.length-1]);
-    watchRequests.update(ReplicationLevel.ALL_COMMITTED, commitIndices[0]);
-    watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, 
getMajority(commitIndices));
+    getMajorityMin(FollowerInfo::getCommitIndex, 
raftLog::getLastCommittedIndex).ifPresent(m -> {
+      // Normally, leader commit index is always ahead followers.
+      // However, after a leader change, the new leader commit index may
+      // be behind some followers in the beginning.
+      watchRequests.update(ReplicationLevel.ALL_COMMITTED, m.min);
+      watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, m.majority);
+      watchRequests.update(ReplicationLevel.MAJORITY, m.max);
+    });
   }
 
   private void applyOldNewConf() {
@@ -503,37 +500,71 @@ public class LeaderState {
     eventQueue.submit(UPDATE_COMMIT_EVENT);
   }
 
+  static class MinMajorityMax {
+    private final long min;
+    private final long majority;
+    private final long max;
+
+    MinMajorityMax(long min, long majority, long max) {
+      this.min = min;
+      this.majority = majority;
+      this.max = max;
+    }
+
+    MinMajorityMax combine(MinMajorityMax that) {
+      return new MinMajorityMax(
+          Math.min(this.min, that.min),
+          Math.min(this.majority, that.majority),
+          Math.min(this.max, that.max));
+    }
+
+    static MinMajorityMax valueOf(long[] sorted) {
+      return new MinMajorityMax(sorted[0], getMajority(sorted), 
getMax(sorted));
+    }
+
+    static long getMajority(long[] sorted) {
+      return sorted[(sorted.length - 1) / 2];
+    }
+
+    static long getMax(long[] sorted) {
+      return sorted[sorted.length - 1];
+    }
+  }
+
   private void updateCommit() {
+    getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getLatestFlushedIndex)
+        .ifPresent(m -> updateCommit(m.majority, m.min));
+  }
+
+  private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> 
followerIndex, LongSupplier logIndex) {
     final RaftPeerId selfId = server.getId();
     final RaftConfiguration conf = server.getRaftConf();
 
     final List<FollowerInfo> followers = voterLists.get(0);
     final boolean includeSelf = conf.containsInConf(selfId);
     if (followers.isEmpty() && !includeSelf) {
-      return;
+      return Optional.empty();
     }
 
-    final long[] indicesInNewConf = getSortedLogIndices(followers, 
includeSelf);
-    final long majorityInNewConf = getMajority(indicesInNewConf);
-    final long majority;
-    final long min;
+    final long[] indicesInNewConf = getSorted(followers, includeSelf, 
followerIndex, logIndex);
+    final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf);
 
     if (!conf.isTransitional()) {
-      majority = majorityInNewConf;
-      min = indicesInNewConf[0];
+      return Optional.of(newConf);
     } else { // configuration is in transitional state
       final List<FollowerInfo> oldFollowers = voterLists.get(1);
       final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
       if (oldFollowers.isEmpty() && !includeSelfInOldConf) {
-        return;
+        return Optional.empty();
       }
 
-      final long[] indicesInOldConf = getSortedLogIndices(oldFollowers, 
includeSelfInOldConf);
-      final long majorityInOldConf = getMajority(indicesInOldConf);
-      majority = Math.min(majorityInNewConf, majorityInOldConf);
-      min = Math.min(indicesInNewConf[0], indicesInOldConf[0]);
+      final long[] indicesInOldConf = getSorted(oldFollowers, 
includeSelfInOldConf, followerIndex, logIndex);
+      final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf);
+      return Optional.of(newConf.combine(oldConf));
     }
+  }
 
+  private void updateCommit(long majority, long min) {
     final long oldLastCommitted = raftLog.getLastCommittedIndex();
     if (majority > oldLastCommitted) {
       // copy the entries out from the raftlog, in order to prevent that
@@ -605,11 +636,8 @@ public class LeaderState {
     notifySenders();
   }
 
-  static long getMajority(long[] indices) {
-    return indices[(indices.length - 1) / 2];
-  }
-
-  private long[] getSortedLogIndices(List<FollowerInfo> followers, boolean 
includeSelf) {
+  private static long[] getSorted(List<FollowerInfo> followers, boolean 
includeSelf,
+      ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) 
{
     final int length = includeSelf ? followers.size() + 1 : followers.size();
     if (length == 0) {
       throw new IllegalArgumentException("followers.size() == "
@@ -617,11 +645,11 @@ public class LeaderState {
     }
     final long[] indices = new long[length];
     for (int i = 0; i < followers.size(); i++) {
-      indices[i] = followers.get(i).getMatchIndex();
+      indices[i] = getFollowerIndex.applyAsLong(followers.get(i));
     }
     if (includeSelf) {
       // note that we also need to wait for the local disk I/O
-      indices[length - 1] = raftLog.getLatestFlushedIndex();
+      indices[length - 1] = getLogIndex.getAsLong();
     }
 
     Arrays.sort(indices);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 02e0f56..6cb8538 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -262,7 +262,7 @@ public class LogAppender {
 
   protected void updateCommitIndex(long commitIndex) {
     if (follower.updateCommitIndex(commitIndex)) {
-      server.commitIndexChanged();
+      leaderState.commitIndexChanged();
     }
   }
 
@@ -398,8 +398,7 @@ public class LogAppender {
     }
 
     if (reply != null) {
-      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
-      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
+      follower.setSnapshotIndex(snapshot.getTermIndex().getIndex());
       LOG.info("{}: install snapshot-{} successfully on follower {}",
           server.getId(), snapshot.getTermIndex().getIndex(), 
follower.getPeer());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 21c2b59..a67b4c5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -115,7 +115,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   LogAppender newLogAppender(
       LeaderState state, RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
       boolean attendVote) {
-    final FollowerInfo f = new FollowerInfo(peer, lastRpcTime, nextIndex, 
attendVote, rpcSlownessTimeoutMs);
+    final FollowerInfo f = new FollowerInfo(getId(), peer, lastRpcTime, 
nextIndex, attendVote, rpcSlownessTimeoutMs);
     return getProxy().getFactory().newLogAppender(this, state, f);
   }
 
@@ -1017,10 +1017,6 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
         groupId, term, lastEntry);
   }
 
-  void commitIndexChanged() {
-    role.getLeaderState().ifPresent(LeaderState::commitIndexChanged);
-  }
-
   public void submitUpdateCommitEvent() {
     role.getLeaderState().ifPresent(LeaderState::submitUpdateCommitEvent);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
new file mode 100644
index 0000000..a7dd1a7
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.StringUtils;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.LongUnaryOperator;
+
+/**
+ * Indices of {@link RaftLog} such as commit index, match index, etc.
+ *
+ * This class is thread safe.
+ */
+public class RaftLogIndex {
+  private final Object name;
+  private final AtomicLong index;
+
+  public RaftLogIndex(Object name, long initialValue) {
+    this.name = name;
+    this.index = new AtomicLong(initialValue);
+  }
+
+  public long get() {
+    return index.get();
+  }
+
+  public boolean setUnconditionally(long newIndex, Consumer<Object> log) {
+    final long old = index.getAndSet(newIndex);
+    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": 
setUnconditionally " + old + " -> " + newIndex));
+    return old != newIndex;
+  }
+
+  public boolean updateUnconditionally(LongUnaryOperator update, 
Consumer<Object> log) {
+    final long old = index.getAndUpdate(update);
+    final long newIndex = update.applyAsLong(old);
+    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": 
updateUnconditionally " + old + " -> " + newIndex));
+    return old != newIndex;
+  }
+
+  public boolean updateIncreasingly(long newIndex, Consumer<Object> log) {
+    final long old = index.getAndSet(newIndex);
+    Preconditions.assertTrue(old <= newIndex,
+        () -> "Failed to updateIncreasingly for " + name + ": " + old + " -> " 
+ newIndex);
+    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": 
updateIncreasingly " + old + " -> " + newIndex));
+    return old != newIndex;
+  }
+
+  public boolean updateToMax(long newIndex, Consumer<Object> log) {
+    final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, 
newIndex));
+    log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateToMax 
" + old + " -> " + newIndex));
+    return old != newIndex;
+  }
+
+  @Override
+  public String toString() {
+    return name + ":" + index;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 0cf9449..9d08bd0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -79,6 +79,21 @@ public abstract class MiniRaftCluster implements Closeable {
       default CLUSTER newCluster(int numPeers) throws IOException {
         return getFactory().newCluster(numPeers, getProperties());
       }
+
+      default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, 
Exception> testCase) throws Exception {
+        final StackTraceElement caller = 
JavaUtils.getCallerStackTraceElement();
+        LOG.info("Running " + caller.getMethodName());
+        final CLUSTER cluster = newCluster(numServers);
+        try {
+          cluster.start();
+          testCase.accept(cluster);
+        } catch(Throwable t) {
+          LOG.error("Failed " + caller + ": " + cluster.printServers(), t);
+          throw t;
+        } finally {
+          cluster.shutdown();
+        }
+      }
     }
 
     public abstract CLUSTER newCluster(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index d8856e7..dbe5865 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -25,10 +25,12 @@ import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.CheckedFunction;
 import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,15 +43,14 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
-  static {
+  {
+    RaftServerTestUtil.setWatchRequestsLogLevel(Level.DEBUG);
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
   static final int NUM_SERVERS = 3;
@@ -64,11 +65,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
 
   @Test
   public void testWatchRequestAsync() throws Exception {
-    LOG.info("Running testWatchRequests");
-    try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
-      cluster.start();
-      runTest(WatchRequestTests::runTestWatchRequestAsync, cluster, LOG);
-    }
+    runWithNewCluster(NUM_SERVERS, cluster -> 
runTest(WatchRequestTests::runTestWatchRequestAsync, cluster, LOG));
   }
 
   static class TestParameters {
@@ -123,14 +120,6 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
-  static long getLogIndex(RaftClient writeClient) throws Exception {
-    // send a message in order to get the current log index
-    final RaftTestUtil.SimpleMessage message = new 
RaftTestUtil.SimpleMessage("getLogIndex");
-    final RaftClientReply reply = 
writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-    Assert.assertTrue(reply.isSuccess());
-    return reply.getLogIndex();
-  }
-
   static void runTest(CheckedFunction<TestParameters, Void, Exception> 
testCase, MiniRaftCluster cluster, Logger LOG) throws Exception {
     try(final RaftClient writeClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
         final RaftClient watchMajorityClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
@@ -146,7 +135,6 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
         LOG.info("{}) {}, {}", i, p, cluster.printServers());
         testCase.apply(p);
       }
-
     }
   }
 
@@ -201,10 +189,29 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     // unblock leader so that the transaction can be committed.
     SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
     LOG.info("unblock leader {}", leader.getId());
-    for(int i = 0; i < numMessages; i++) {
+
+    checkMajority(replies, watches, LOG);
+
+    Assert.assertEquals(numMessages, watches.size());
+
+    // but not replicated/committed to all.
+    TimeUnit.SECONDS.sleep(1);
+    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.all));
+    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.allCommitted));
+
+    // unblock follower so that the transaction can be replicated and 
committed to all.
+    LOG.info("unblock follower {}", blockedFollower.getId());
+    
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
+    checkAll(watches, LOG);
+    return null;
+  }
+
+  static void checkMajority(List<CompletableFuture<RaftClientReply>> replies,
+      List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws 
Exception {
+    for(int i = 0; i < replies.size(); i++) {
       final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+      LOG.info("checkMajority {}: receive {}", i, reply);
       final long logIndex = reply.getLogIndex();
-      LOG.info("{}: receive reply for logIndex={}", i, logIndex);
       Assert.assertTrue(reply.isSuccess());
 
       final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
@@ -215,37 +222,30 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
 
       final RaftClientReply watchMajorityCommittedReply
           = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      LOG.info("watchMajorityCommittedReply({}) = ", logIndex, 
watchMajorityCommittedReply);
+      LOG.info("watchMajorityCommittedReply({}) = {}", logIndex, 
watchMajorityCommittedReply);
       Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = 
watchMajorityCommittedReply.getCommitInfos();
+        final String message = "logIndex=" + logIndex + ", " + 
ProtoUtils.toString(commitInfos);
         Assert.assertEquals(NUM_SERVERS, commitInfos.size());
 
         // One follower has not committed, so min must be less than logIndex
-        Assert.assertTrue(commitInfos.stream()
-            .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < 
logIndex);
+        final long min = 
commitInfos.stream().map(CommitInfoProto::getCommitIndex).min(Long::compare).get();
+        Assert.assertTrue(message, logIndex > min);
 
         // All other followers have committed
         commitInfos.stream()
             .map(CommitInfoProto::getCommitIndex).sorted(Long::compare)
-            .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci));
+            .skip(1).forEach(ci -> Assert.assertTrue(message, logIndex <= ci));
       }
     }
+  }
 
-    Assert.assertEquals(numMessages, watches.size());
-
-    // but not replicated/committed to all.
-    TimeUnit.SECONDS.sleep(1);
-    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.all));
-    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.allCommitted));
-
-    // unblock follower so that the transaction can be replicated and 
committed to all.
-    LOG.info("unblock follower {}", blockedFollower.getId());
-    
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
-    for(int i = 0; i < numMessages; i++) {
+  static void checkAll(List<CompletableFuture<WatchReplies>> watches, Logger 
LOG) throws Exception {
+    for(int i = 0; i < watches.size(); i++) {
       final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
       final long logIndex = watchReplies.logIndex;
-      LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
+      LOG.info("checkAll {}: logIndex={}", i, logIndex);
       final RaftClientReply watchAllReply = 
watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
       Assert.assertTrue(watchAllReply.isSuccess());
@@ -255,11 +255,11 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = 
watchAllCommittedReply.getCommitInfos();
+        final String message = "logIndex=" + logIndex + ", " + 
ProtoUtils.toString(commitInfos);
         Assert.assertEquals(NUM_SERVERS, commitInfos.size());
-        commitInfos.forEach(info -> Assert.assertTrue(logIndex <= 
info.getCommitIndex()));
+        commitInfos.forEach(info -> Assert.assertTrue(message, logIndex <= 
info.getCommitIndex()));
       }
     }
-    return null;
   }
 
   static <T> void assertNotDone(List<CompletableFuture<T>> futures) {
@@ -280,11 +280,8 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
 
   @Test
   public void testWatchRequestAsyncChangeLeader() throws Exception {
-    LOG.info("Running testWatchRequestAsyncChangeLeader");
-    try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
-      cluster.start();
-      runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, 
cluster, LOG);
-    }
+    runWithNewCluster(NUM_SERVERS,
+        cluster -> 
runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, cluster, LOG));
   }
 
   static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws 
Exception {
@@ -307,37 +304,8 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     Assert.assertEquals(numMessages, watches.size());
 
     // since only one follower is blocked, requests can be committed MAJORITY 
but neither ALL nor ALL_COMMITTED.
-    for(int i = 0; i < numMessages; i++) {
-      final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      final long logIndex = reply.getLogIndex();
-      LOG.info("UNBLOCK_F1 {}: reply logIndex={}", i, logIndex);
-      Assert.assertTrue(reply.isSuccess());
+    checkMajority(replies, watches, LOG);
 
-      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      Assert.assertEquals(logIndex, watchReplies.logIndex);
-      final RaftClientReply watchMajorityReply = 
watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
-      Assert.assertTrue(watchMajorityReply.isSuccess());
-
-      final RaftClientReply watchMajorityCommittedReply
-          = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      LOG.info("watchMajorityCommittedReply({}) = ", logIndex, 
watchMajorityCommittedReply);
-      Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
-      { // check commit infos
-        final Collection<CommitInfoProto> commitInfos = 
watchMajorityCommittedReply.getCommitInfos();
-        LOG.info("commitInfos=" + commitInfos);
-        Assert.assertEquals(NUM_SERVERS, commitInfos.size());
-
-        // One follower has not committed, so min must be less than logIndex
-        Assert.assertTrue(commitInfos.stream()
-            .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < 
logIndex);
-
-        // All other followers have committed
-        commitInfos.stream()
-            .map(CommitInfoProto::getCommitIndex).sorted(Long::compare)
-            .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci));
-      }
-    }
     TimeUnit.SECONDS.sleep(1);
     assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.all));
     assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.allCommitted));
@@ -348,23 +316,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     // unblock follower so that the transaction can be replicated and 
committed to all.
     
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
     LOG.info("unblock follower {}", blockedFollower.getId());
-    for(int i = 0; i < numMessages; i++) {
-      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      final long logIndex = watchReplies.logIndex;
-      LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
-      final RaftClientReply watchAllReply = 
watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
-      Assert.assertTrue(watchAllReply.isSuccess());
-
-      final RaftClientReply watchAllCommittedReply = 
watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      LOG.info("watchAllCommittedReply({}) = {}", logIndex, 
watchAllCommittedReply);
-      Assert.assertTrue(watchAllCommittedReply.isSuccess());
-      { // check commit infos
-        final Collection<CommitInfoProto> commitInfos = 
watchAllCommittedReply.getCommitInfos();
-        Assert.assertEquals(NUM_SERVERS, commitInfos.size());
-        commitInfos.forEach(info -> Assert.assertTrue(logIndex <= 
info.getCommitIndex()));
-      }
-    }
+    checkAll(watches, LOG);
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index ee9008a..b18f794 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -17,11 +17,13 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.log4j.Level;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -35,6 +37,10 @@ import java.util.stream.Stream;
 public class RaftServerTestUtil {
   static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
 
+  public static void setWatchRequestsLogLevel(Level level) {
+    LogUtils.setLogLevel(WatchRequests.LOG, level);
+  }
+
   public static void waitAndCheckNewConf(MiniRaftCluster cluster,
       RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers)
       throws Exception {


Reply via email to