Repository: incubator-ratis Updated Branches: refs/heads/master 2fb45da94 -> 96a3b1bfe
RATIS-409. Watch requests may not work if there is a conf change. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/96a3b1bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/96a3b1bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/96a3b1bf Branch: refs/heads/master Commit: 96a3b1bfeda319e8543da3dbcd2afadd5468a803 Parents: 2fb45da Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Sat Nov 17 15:52:52 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Sat Nov 17 15:52:52 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/JavaUtils.java | 5 + .../ratis/grpc/server/GrpcLogAppender.java | 18 ++- .../grpc/server/GrpcServerProtocolService.java | 4 + .../apache/ratis/server/impl/FollowerInfo.java | 65 ++++++---- .../apache/ratis/server/impl/LeaderState.java | 92 ++++++++----- .../apache/ratis/server/impl/LogAppender.java | 5 +- .../ratis/server/impl/RaftServerImpl.java | 6 +- .../ratis/server/storage/RaftLogIndex.java | 76 +++++++++++ .../java/org/apache/ratis/MiniRaftCluster.java | 15 +++ .../org/apache/ratis/WatchRequestTests.java | 130 ++++++------------- .../ratis/server/impl/RaftServerTestUtil.java | 6 + 11 files changed, 262 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java index 95fcf35..f3b0a0d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -67,6 +67,11 @@ public interface JavaUtils { return t; } + static StackTraceElement getCallerStackTraceElement() { + final StackTraceElement[] trace = Thread.currentThread().getStackTrace(); + return trace[3]; + } + /** * Invoke {@link Callable#call()} and, if there any, * wrap the checked exception by {@link RuntimeException}. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 11ff4d8..ed96468 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -206,10 +206,19 @@ public class GrpcLogAppender extends LogAppender { */ @Override public void onNext(AppendEntriesReplyProto reply) { - LOG.debug("{} received {} response from {}", server.getId(), - (!firstResponseReceived ? "the first" : "a"), - follower.getPeer()); + if (LOG.isDebugEnabled()) { + LOG.debug("{}<-{}: received {} reply {} ", server.getId(), follower.getPeer(), + (!firstResponseReceived? "the first": "a"), ServerProtoUtils.toString(reply)); + } + + try { + onNextImpl(reply); + } catch(Throwable t) { + LOG.error("Failed onNext " + reply, t); + } + } + private void onNextImpl(AppendEntriesReplyProto reply) { // update the last rpc time follower.updateLastRpcResponseTime(); @@ -428,8 +437,7 @@ public class GrpcLogAppender extends LogAppender { } if (responseHandler.hasAllResponse()) { - follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); - follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); + follower.setSnapshotIndex(snapshot.getTermIndex().getIndex()); LOG.info("{}: install snapshot-{} successfully on follower {}", server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 12a717a..5c5bd66 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -20,6 +20,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; @@ -76,6 +77,9 @@ public class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase server.appendEntriesAsync(request).thenCombine(previous, (reply, v) -> { if (!isClosed.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug(server.getId() + ": reply " + ServerProtoUtils.toString(reply)); + } responseObserver.onNext(reply); } current.complete(null); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java index 254319a..94bd7c4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java @@ -18,71 +18,84 @@ package org.apache.ratis.server.impl; import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.util.Preconditions; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.storage.RaftLogIndex; import org.apache.ratis.util.Timestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; public class FollowerInfo { + public static final Logger LOG = LoggerFactory.getLogger(FollowerInfo.class); + + private final String name; + private final Consumer<Object> infoIndexChange; + private final Consumer<Object> debugIndexChange; + private final RaftPeer peer; private final AtomicReference<Timestamp> lastRpcResponseTime; private final AtomicReference<Timestamp> lastRpcSendTime; - private long nextIndex; - private final AtomicLong matchIndex; - private final AtomicLong commitIndex = new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX); + private final RaftLogIndex nextIndex; + private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L); + private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftServerConstants.INVALID_LOG_INDEX); private volatile boolean attendVote; private final int rpcSlownessTimeoutMs; - FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex, + + FollowerInfo(RaftPeerId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote, int rpcSlownessTimeoutMs) { + this.name = id + "->" + peer.getId(); + this.infoIndexChange = s -> LOG.info("{}: {}", name, s); + this.debugIndexChange = s -> LOG.debug("{}: {}", name, s); + this.peer = peer; this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); - this.nextIndex = nextIndex; - this.matchIndex = new AtomicLong(0); + this.nextIndex = new RaftLogIndex("nextIndex", nextIndex); this.attendVote = attendVote; this.rpcSlownessTimeoutMs = rpcSlownessTimeoutMs; } - public void updateMatchIndex(final long matchIndex) { - this.matchIndex.set(matchIndex); - } - public long getMatchIndex() { return matchIndex.get(); } + public void updateMatchIndex(long newMatchIndex) { + matchIndex.updateIncreasingly(newMatchIndex, debugIndexChange); + } + /** @return the commit index acked by the follower. */ long getCommitIndex() { return commitIndex.get(); } boolean updateCommitIndex(long newCommitIndex) { - final long old = commitIndex.getAndUpdate(oldCommitIndex -> newCommitIndex); - Preconditions.assertTrue(newCommitIndex >= old, - () -> "newCommitIndex = " + newCommitIndex + " < old = " + old); - return old != newCommitIndex; + return commitIndex.updateToMax(newCommitIndex, debugIndexChange); + } + + public long getNextIndex() { + return nextIndex.get(); } - public synchronized long getNextIndex() { - return nextIndex; + public void updateNextIndex(long newNextIndex) { + nextIndex.updateIncreasingly(newNextIndex, debugIndexChange); } - public synchronized void updateNextIndex(long i) { - nextIndex = i; + public void decreaseNextIndex(long newNextIndex) { + nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), infoIndexChange); } - public synchronized void decreaseNextIndex(long targetIndex) { - if (nextIndex > 0) { - nextIndex = Math.min(nextIndex - 1, targetIndex); - } + public void setSnapshotIndex(long snapshotIndex) { + matchIndex.setUnconditionally(snapshotIndex, infoIndexChange); + nextIndex.setUnconditionally(snapshotIndex + 1, infoIndexChange); } @Override public String toString() { - return peer.getId() + "(next=" + nextIndex + ", match=" + matchIndex + "," + - " attendVote=" + attendVote + + return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" + getNextIndex() + + ", attendVote=" + attendVote + ", lastRpcSendTime=" + lastRpcSendTime.get().elapsedTimeMs() + ", lastRpcResponseTime=" + lastRpcResponseTime.get().elapsedTimeMs() + ")"; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index c4c4fe2..46c3ac1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -38,9 +38,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.LongSupplier; import java.util.function.Predicate; +import java.util.function.ToLongFunction; import java.util.stream.Collectors; -import java.util.stream.LongStream; import java.util.stream.Stream; /** @@ -302,18 +303,14 @@ public class LeaderState { } void commitIndexChanged() { - final long[] commitIndices = LongStream.concat(LongStream.of( - raftLog.getLastCommittedIndex()), senders.stream() - .map(LogAppender::getFollower) - .mapToLong(FollowerInfo::getCommitIndex)) - .sorted().toArray(); - - // Normally, leader commit index is always ahead followers. - // However, after a leader change, the new leader commit index may - // be behind some followers in the beginning. - watchRequests.update(ReplicationLevel.MAJORITY, commitIndices[commitIndices.length-1]); - watchRequests.update(ReplicationLevel.ALL_COMMITTED, commitIndices[0]); - watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, getMajority(commitIndices)); + getMajorityMin(FollowerInfo::getCommitIndex, raftLog::getLastCommittedIndex).ifPresent(m -> { + // Normally, leader commit index is always ahead followers. + // However, after a leader change, the new leader commit index may + // be behind some followers in the beginning. + watchRequests.update(ReplicationLevel.ALL_COMMITTED, m.min); + watchRequests.update(ReplicationLevel.MAJORITY_COMMITTED, m.majority); + watchRequests.update(ReplicationLevel.MAJORITY, m.max); + }); } private void applyOldNewConf() { @@ -503,37 +500,71 @@ public class LeaderState { eventQueue.submit(UPDATE_COMMIT_EVENT); } + static class MinMajorityMax { + private final long min; + private final long majority; + private final long max; + + MinMajorityMax(long min, long majority, long max) { + this.min = min; + this.majority = majority; + this.max = max; + } + + MinMajorityMax combine(MinMajorityMax that) { + return new MinMajorityMax( + Math.min(this.min, that.min), + Math.min(this.majority, that.majority), + Math.min(this.max, that.max)); + } + + static MinMajorityMax valueOf(long[] sorted) { + return new MinMajorityMax(sorted[0], getMajority(sorted), getMax(sorted)); + } + + static long getMajority(long[] sorted) { + return sorted[(sorted.length - 1) / 2]; + } + + static long getMax(long[] sorted) { + return sorted[sorted.length - 1]; + } + } + private void updateCommit() { + getMajorityMin(FollowerInfo::getMatchIndex, raftLog::getLatestFlushedIndex) + .ifPresent(m -> updateCommit(m.majority, m.min)); + } + + private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> followerIndex, LongSupplier logIndex) { final RaftPeerId selfId = server.getId(); final RaftConfiguration conf = server.getRaftConf(); final List<FollowerInfo> followers = voterLists.get(0); final boolean includeSelf = conf.containsInConf(selfId); if (followers.isEmpty() && !includeSelf) { - return; + return Optional.empty(); } - final long[] indicesInNewConf = getSortedLogIndices(followers, includeSelf); - final long majorityInNewConf = getMajority(indicesInNewConf); - final long majority; - final long min; + final long[] indicesInNewConf = getSorted(followers, includeSelf, followerIndex, logIndex); + final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf); if (!conf.isTransitional()) { - majority = majorityInNewConf; - min = indicesInNewConf[0]; + return Optional.of(newConf); } else { // configuration is in transitional state final List<FollowerInfo> oldFollowers = voterLists.get(1); final boolean includeSelfInOldConf = conf.containsInOldConf(selfId); if (oldFollowers.isEmpty() && !includeSelfInOldConf) { - return; + return Optional.empty(); } - final long[] indicesInOldConf = getSortedLogIndices(oldFollowers, includeSelfInOldConf); - final long majorityInOldConf = getMajority(indicesInOldConf); - majority = Math.min(majorityInNewConf, majorityInOldConf); - min = Math.min(indicesInNewConf[0], indicesInOldConf[0]); + final long[] indicesInOldConf = getSorted(oldFollowers, includeSelfInOldConf, followerIndex, logIndex); + final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf); + return Optional.of(newConf.combine(oldConf)); } + } + private void updateCommit(long majority, long min) { final long oldLastCommitted = raftLog.getLastCommittedIndex(); if (majority > oldLastCommitted) { // copy the entries out from the raftlog, in order to prevent that @@ -605,11 +636,8 @@ public class LeaderState { notifySenders(); } - static long getMajority(long[] indices) { - return indices[(indices.length - 1) / 2]; - } - - private long[] getSortedLogIndices(List<FollowerInfo> followers, boolean includeSelf) { + private static long[] getSorted(List<FollowerInfo> followers, boolean includeSelf, + ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) { final int length = includeSelf ? followers.size() + 1 : followers.size(); if (length == 0) { throw new IllegalArgumentException("followers.size() == " @@ -617,11 +645,11 @@ public class LeaderState { } final long[] indices = new long[length]; for (int i = 0; i < followers.size(); i++) { - indices[i] = followers.get(i).getMatchIndex(); + indices[i] = getFollowerIndex.applyAsLong(followers.get(i)); } if (includeSelf) { // note that we also need to wait for the local disk I/O - indices[length - 1] = raftLog.getLatestFlushedIndex(); + indices[length - 1] = getLogIndex.getAsLong(); } Arrays.sort(indices); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 02e0f56..6cb8538 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -262,7 +262,7 @@ public class LogAppender { protected void updateCommitIndex(long commitIndex) { if (follower.updateCommitIndex(commitIndex)) { - server.commitIndexChanged(); + leaderState.commitIndexChanged(); } } @@ -398,8 +398,7 @@ public class LogAppender { } if (reply != null) { - follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); - follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); + follower.setSnapshotIndex(snapshot.getTermIndex().getIndex()); LOG.info("{}: install snapshot-{} successfully on follower {}", server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 21c2b59..a67b4c5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -115,7 +115,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou LogAppender newLogAppender( LeaderState state, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) { - final FollowerInfo f = new FollowerInfo(peer, lastRpcTime, nextIndex, attendVote, rpcSlownessTimeoutMs); + final FollowerInfo f = new FollowerInfo(getId(), peer, lastRpcTime, nextIndex, attendVote, rpcSlownessTimeoutMs); return getProxy().getFactory().newLogAppender(this, state, f); } @@ -1017,10 +1017,6 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou groupId, term, lastEntry); } - void commitIndexChanged() { - role.getLeaderState().ifPresent(LeaderState::commitIndexChanged); - } - public void submitUpdateCommitEvent() { role.getLeaderState().ifPresent(LeaderState::submitUpdateCommitEvent); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java new file mode 100644 index 0000000..a7dd1a7 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIndex.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.StringUtils; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.LongUnaryOperator; + +/** + * Indices of {@link RaftLog} such as commit index, match index, etc. + * + * This class is thread safe. + */ +public class RaftLogIndex { + private final Object name; + private final AtomicLong index; + + public RaftLogIndex(Object name, long initialValue) { + this.name = name; + this.index = new AtomicLong(initialValue); + } + + public long get() { + return index.get(); + } + + public boolean setUnconditionally(long newIndex, Consumer<Object> log) { + final long old = index.getAndSet(newIndex); + log.accept(StringUtils.stringSupplierAsObject(() -> name + ": setUnconditionally " + old + " -> " + newIndex)); + return old != newIndex; + } + + public boolean updateUnconditionally(LongUnaryOperator update, Consumer<Object> log) { + final long old = index.getAndUpdate(update); + final long newIndex = update.applyAsLong(old); + log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateUnconditionally " + old + " -> " + newIndex)); + return old != newIndex; + } + + public boolean updateIncreasingly(long newIndex, Consumer<Object> log) { + final long old = index.getAndSet(newIndex); + Preconditions.assertTrue(old <= newIndex, + () -> "Failed to updateIncreasingly for " + name + ": " + old + " -> " + newIndex); + log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateIncreasingly " + old + " -> " + newIndex)); + return old != newIndex; + } + + public boolean updateToMax(long newIndex, Consumer<Object> log) { + final long old = index.getAndUpdate(oldIndex -> Math.max(oldIndex, newIndex)); + log.accept(StringUtils.stringSupplierAsObject(() -> name + ": updateToMax " + old + " -> " + newIndex)); + return old != newIndex; + } + + @Override + public String toString() { + return name + ":" + index; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 0cf9449..9d08bd0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -79,6 +79,21 @@ public abstract class MiniRaftCluster implements Closeable { default CLUSTER newCluster(int numPeers) throws IOException { return getFactory().newCluster(numPeers, getProperties()); } + + default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { + final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); + LOG.info("Running " + caller.getMethodName()); + final CLUSTER cluster = newCluster(numServers); + try { + cluster.start(); + testCase.accept(cluster); + } catch(Throwable t) { + LOG.error("Failed " + caller + ": " + cluster.printServers(), t); + throw t; + } finally { + cluster.shutdown(); + } + } } public abstract CLUSTER newCluster( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index d8856e7..dbe5865 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -25,10 +25,12 @@ import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.CheckedFunction; import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Before; @@ -41,15 +43,14 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> extends BaseTest implements MiniRaftCluster.Factory.Get<CLUSTER> { - static { + { + RaftServerTestUtil.setWatchRequestsLogLevel(Level.DEBUG); LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } static final int NUM_SERVERS = 3; @@ -64,11 +65,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> @Test public void testWatchRequestAsync() throws Exception { - LOG.info("Running testWatchRequests"); - try(final CLUSTER cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - runTest(WatchRequestTests::runTestWatchRequestAsync, cluster, LOG); - } + runWithNewCluster(NUM_SERVERS, cluster -> runTest(WatchRequestTests::runTestWatchRequestAsync, cluster, LOG)); } static class TestParameters { @@ -123,14 +120,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } - static long getLogIndex(RaftClient writeClient) throws Exception { - // send a message in order to get the current log index - final RaftTestUtil.SimpleMessage message = new RaftTestUtil.SimpleMessage("getLogIndex"); - final RaftClientReply reply = writeClient.sendAsync(message).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - Assert.assertTrue(reply.isSuccess()); - return reply.getLogIndex(); - } - static void runTest(CheckedFunction<TestParameters, Void, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception { try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); @@ -146,7 +135,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> LOG.info("{}) {}, {}", i, p, cluster.printServers()); testCase.apply(p); } - } } @@ -201,10 +189,29 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> // unblock leader so that the transaction can be committed. SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); LOG.info("unblock leader {}", leader.getId()); - for(int i = 0; i < numMessages; i++) { + + checkMajority(replies, watches, LOG); + + Assert.assertEquals(numMessages, watches.size()); + + // but not replicated/committed to all. + TimeUnit.SECONDS.sleep(1); + assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all)); + assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted)); + + // unblock follower so that the transaction can be replicated and committed to all. + LOG.info("unblock follower {}", blockedFollower.getId()); + SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); + checkAll(watches, LOG); + return null; + } + + static void checkMajority(List<CompletableFuture<RaftClientReply>> replies, + List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception { + for(int i = 0; i < replies.size(); i++) { final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("checkMajority {}: receive {}", i, reply); final long logIndex = reply.getLogIndex(); - LOG.info("{}: receive reply for logIndex={}", i, logIndex); Assert.assertTrue(reply.isSuccess()); final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); @@ -215,37 +222,30 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final RaftClientReply watchMajorityCommittedReply = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); + LOG.info("watchMajorityCommittedReply({}) = {}", logIndex, watchMajorityCommittedReply); Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); + final String message = "logIndex=" + logIndex + ", " + ProtoUtils.toString(commitInfos); Assert.assertEquals(NUM_SERVERS, commitInfos.size()); // One follower has not committed, so min must be less than logIndex - Assert.assertTrue(commitInfos.stream() - .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < logIndex); + final long min = commitInfos.stream().map(CommitInfoProto::getCommitIndex).min(Long::compare).get(); + Assert.assertTrue(message, logIndex > min); // All other followers have committed commitInfos.stream() .map(CommitInfoProto::getCommitIndex).sorted(Long::compare) - .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci)); + .skip(1).forEach(ci -> Assert.assertTrue(message, logIndex <= ci)); } } + } - Assert.assertEquals(numMessages, watches.size()); - - // but not replicated/committed to all. - TimeUnit.SECONDS.sleep(1); - assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all)); - assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted)); - - // unblock follower so that the transaction can be replicated and committed to all. - LOG.info("unblock follower {}", blockedFollower.getId()); - SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); - for(int i = 0; i < numMessages; i++) { + static void checkAll(List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception { + for(int i = 0; i < watches.size(); i++) { final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); final long logIndex = watchReplies.logIndex; - LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); + LOG.info("checkAll {}: logIndex={}", i, logIndex); final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); Assert.assertTrue(watchAllReply.isSuccess()); @@ -255,11 +255,11 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos(); + final String message = "logIndex=" + logIndex + ", " + ProtoUtils.toString(commitInfos); Assert.assertEquals(NUM_SERVERS, commitInfos.size()); - commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); + commitInfos.forEach(info -> Assert.assertTrue(message, logIndex <= info.getCommitIndex())); } } - return null; } static <T> void assertNotDone(List<CompletableFuture<T>> futures) { @@ -280,11 +280,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> @Test public void testWatchRequestAsyncChangeLeader() throws Exception { - LOG.info("Running testWatchRequestAsyncChangeLeader"); - try(final CLUSTER cluster = newCluster(NUM_SERVERS)) { - cluster.start(); - runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, cluster, LOG); - } + runWithNewCluster(NUM_SERVERS, + cluster -> runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, cluster, LOG)); } static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception { @@ -307,37 +304,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> Assert.assertEquals(numMessages, watches.size()); // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED. - for(int i = 0; i < numMessages; i++) { - final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - final long logIndex = reply.getLogIndex(); - LOG.info("UNBLOCK_F1 {}: reply logIndex={}", i, logIndex); - Assert.assertTrue(reply.isSuccess()); + checkMajority(replies, watches, LOG); - final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - Assert.assertEquals(logIndex, watchReplies.logIndex); - final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); - Assert.assertTrue(watchMajorityReply.isSuccess()); - - final RaftClientReply watchMajorityCommittedReply - = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchMajorityCommittedReply({}) = ", logIndex, watchMajorityCommittedReply); - Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); - { // check commit infos - final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); - LOG.info("commitInfos=" + commitInfos); - Assert.assertEquals(NUM_SERVERS, commitInfos.size()); - - // One follower has not committed, so min must be less than logIndex - Assert.assertTrue(commitInfos.stream() - .map(CommitInfoProto::getCommitIndex).min(Long::compare).get() < logIndex); - - // All other followers have committed - commitInfos.stream() - .map(CommitInfoProto::getCommitIndex).sorted(Long::compare) - .skip(1).forEach(ci -> Assert.assertTrue(logIndex <= ci)); - } - } TimeUnit.SECONDS.sleep(1); assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all)); assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted)); @@ -348,23 +316,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> // unblock follower so that the transaction can be replicated and committed to all. SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); LOG.info("unblock follower {}", blockedFollower.getId()); - for(int i = 0; i < numMessages; i++) { - final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - final long logIndex = watchReplies.logIndex; - LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex); - final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); - Assert.assertTrue(watchAllReply.isSuccess()); - - final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); - Assert.assertTrue(watchAllCommittedReply.isSuccess()); - { // check commit infos - final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos(); - Assert.assertEquals(NUM_SERVERS, commitInfos.size()); - commitInfos.forEach(info -> Assert.assertTrue(logIndex <= info.getCommitIndex())); - } - } + checkAll(watches, LOG); return null; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/96a3b1bf/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index ee9008a..b18f794 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -17,11 +17,13 @@ */ package org.apache.ratis.server.impl; +import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.slf4j.Logger; @@ -35,6 +37,10 @@ import java.util.stream.Stream; public class RaftServerTestUtil { static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class); + public static void setWatchRequestsLogLevel(Level level) { + LogUtils.setLogLevel(WatchRequests.LOG, level); + } + public static void waitAndCheckNewConf(MiniRaftCluster cluster, RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers) throws Exception {
