Repository: incubator-ratis Updated Branches: refs/heads/master a56bfcebd -> d72d9c6eb
RATIS-359. Add timeout support for Watch requests. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/d72d9c6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/d72d9c6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/d72d9c6e Branch: refs/heads/master Commit: d72d9c6eb8cb32fa4058b5ab5710e9cd45e2653d Parents: a56bfce Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Tue Dec 4 17:20:11 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Tue Dec 4 17:20:11 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/conf/ConfUtils.java | 9 ++ .../ratis/protocol/NotReplicatedException.java | 6 +- .../apache/ratis/protocol/RaftClientReply.java | 6 + .../java/org/apache/ratis/util/StringUtils.java | 19 +++ .../org/apache/ratis/util/TimeDuration.java | 23 +++ .../java/org/apache/ratis/util/Timestamp.java | 12 +- .../ratis/server/RaftServerConfigKeys.java | 25 +++- .../apache/ratis/server/impl/LeaderState.java | 16 +- .../ratis/server/impl/RaftServerImpl.java | 2 +- .../apache/ratis/server/impl/WatchRequests.java | 113 ++++++++++---- .../org/apache/ratis/WatchRequestTests.java | 150 ++++++++++++++++--- .../org/apache/ratis/util/TestTimeDuration.java | 18 ++- 12 files changed, 338 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java index 3ffd8be..98ee162 100644 --- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java @@ -94,6 +94,15 @@ public interface ConfUtils { }; } + static BiConsumer<String, TimeDuration> requirePositive() { + return (key, value) -> { + if (value.getDuration() <= 0) { + throw new IllegalArgumentException( + key + " = " + value + " is non-positive."); + } + }; + } + static BiFunction<String, Long, Integer> requireInt() { return (key, value) -> { try { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java index 0a85b8e..c8643d7 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,8 +25,8 @@ public class NotReplicatedException extends RaftException { private final long logIndex; public NotReplicatedException(long callId, ReplicationLevel requiredReplication, long logIndex) { - super("Request with call Id " + callId + " is committed with log index " + logIndex - + " but not yet replicated to " + requiredReplication); + super("Request with call Id " + callId + " and log index " + logIndex + + " is not yet replicated to " + requiredReplication); this.callId = callId; this.requiredReplication = requiredReplication; this.logIndex = logIndex; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 7b3979b..7a9574f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -86,6 +86,12 @@ public class RaftClientReply extends RaftClientMessage { request.getCallId(), true, message, null, 0L, commitInfos); } + public RaftClientReply(RaftClientRequest request, NotReplicatedException nre, + Collection<CommitInfoProto> commitInfos) { + this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), + request.getCallId(), false, request.getMessage(), nre, nre.getLogIndex(), commitInfos); + } + public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) { this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(), reply.getCallId(), false, reply.getMessage(), nre, reply.getLogIndex(), reply.getCommitInfos()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java index be797a0..1a6d701 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; public class StringUtils { @@ -146,4 +147,22 @@ public class StringUtils { return b.append("\n}").toString(); } } + + public static String completableFuture2String(CompletableFuture<?> future, boolean includeDetails) { + if (!future.isDone()) { + return "NOT_DONE"; + } else if (future.isCancelled()) { + return "CANCELLED"; + } else if (future.isCompletedExceptionally()) { + if (!includeDetails) { + return "EXCEPTION"; + } + return future.thenApply(Objects::toString).exceptionally(Throwable::toString).join(); + } else { + if (!includeDetails) { + return "COMPLETED"; + } + return "" + future.join(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java index 2fad806..7daa4dd 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java @@ -126,6 +126,29 @@ public class TimeDuration implements Comparable<TimeDuration> { return Math.toIntExact(toLong(targetUnit)); } + public TimeDuration to(TimeUnit targetUnit) { + return valueOf(toLong(targetUnit), targetUnit); + } + + /** Round up to the given nanos to nearest multiple (in nanoseconds) of this {@link TimeDuration}. */ + public long roundUp(long nanos) { + if (duration <= 0) { + throw new ArithmeticException( + "Rounding up to a non-positive " + getClass().getSimpleName() + " (=" + this + ")"); + } + + final long divisor = unit.toNanos(duration); + if (nanos == 0 || divisor == 1) { + return nanos; + } + + long remainder = nanos % divisor; // In Java, the sign of remainder is the same as the dividend. + if (remainder > 0) { + remainder -= divisor; + } + return nanos - remainder; + } + /** * Apply the given operator to the duration value of this object. * http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java index 96e2a57..8ab3f6b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,6 +29,16 @@ public class Timestamp implements Comparable<Timestamp> { private static final long START_TIME = System.nanoTime(); + /** @return a {@link Timestamp} for the given nanos. */ + public static Timestamp valueOf(long nanos) { + return new Timestamp(nanos); + } + + /** @return a long in nanos for the current time. */ + public static long currentTimeNanos() { + return System.nanoTime(); + } + /** @return the latest timestamp. */ public static Timestamp latest(Timestamp a, Timestamp b) { return a.compareTo(b) > 0? a: b; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index ed08ca9..25d4b0c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -79,6 +79,29 @@ public interface RaftServerConfigKeys { } + String WATCH_TIMEOUT_DENOMINATION_KEY = PREFIX + ".watch.timeout.denomination"; + TimeDuration WATCH_TIMEOUT_DENOMINATION_DEFAULT = TimeDuration.valueOf(1, TimeUnit.SECONDS); + static TimeDuration watchTimeoutDenomination(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DENOMINATION_DEFAULT.getUnit()), + WATCH_TIMEOUT_DENOMINATION_KEY, WATCH_TIMEOUT_DENOMINATION_DEFAULT, getDefaultLog(), requirePositive()); + } + static void setWatchTimeoutDenomination(RaftProperties properties, TimeDuration watchTimeout) { + setTimeDuration(properties::setTimeDuration, WATCH_TIMEOUT_DENOMINATION_KEY, watchTimeout); + } + + /** + * Timeout for watch requests. + */ + String WATCH_TIMEOUT_KEY = PREFIX + ".watch.timeout"; + TimeDuration WATCH_TIMEOUT_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS); + static TimeDuration watchTimeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(WATCH_TIMEOUT_DEFAULT.getUnit()), + WATCH_TIMEOUT_KEY, WATCH_TIMEOUT_DEFAULT, getDefaultLog(), requirePositive()); + } + static void setWatchTimeout(RaftProperties properties, TimeDuration watchTimeout) { + setTimeDuration(properties::setTimeDuration, WATCH_TIMEOUT_KEY, watchTimeout); + } + interface Log { String PREFIX = RaftServerConfigKeys.PREFIX + ".log"; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 1bc6e79..924a7d0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -35,6 +35,7 @@ import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -199,7 +200,7 @@ public class LeaderState { this.currentTerm = state.getCurrentTerm(); processor = new EventProcessor(); this.pendingRequests = new PendingRequests(server.getId()); - this.watchRequests = new WatchRequests(server); + this.watchRequests = new WatchRequests(server.getId(), properties); final RaftConfiguration conf = server.getRaftConf(); Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); @@ -299,9 +300,18 @@ public class LeaderState { return pendingRequests.add(request, entry); } - CompletableFuture<Void> addWatchReqeust(RaftClientRequest request) { + CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) { LOG.debug("{}: addWatchRequest {}", server.getId(), request); - return watchRequests.add(request.getType().getWatch()); + return watchRequests.add(request) + .thenApply(v -> new RaftClientReply(request, server.getCommitInfos())) + .exceptionally(e -> { + e = JavaUtils.unwrapCompletionException(e); + if (e instanceof NotReplicatedException) { + return new RaftClientReply(request, (NotReplicatedException)e, server.getCommitInfos()); + } else { + throw new CompletionException(e); + } + }); } void commitIndexChanged() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 4ea78ce..ed74866 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -556,7 +556,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) { return role.getLeaderState() - .map(ls -> ls.addWatchReqeust(request).thenApply(v -> new RaftClientReply(request, getCommitInfos()))) + .map(ls -> ls.addWatchReqeust(request)) .orElseGet(() -> CompletableFuture.completedFuture( new RaftClientReply(request, generateNotLeaderException(), getCommitInfos()))); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java index b7d6635..912d12d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,10 +17,13 @@ */ package org.apache.ratis.server.impl; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto; +import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.util.Preconditions; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,37 +31,48 @@ import java.util.Arrays; import java.util.Comparator; import java.util.EnumMap; import java.util.Map; -import java.util.PriorityQueue; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; class WatchRequests { public static final Logger LOG = LoggerFactory.getLogger(WatchRequests.class); static class PendingWatch { private final WatchRequestTypeProto watch; - private final CompletableFuture<Void> future = new CompletableFuture<>(); + private final Timestamp creationTime; + private final Supplier<CompletableFuture<Void>> future = JavaUtils.memoize(CompletableFuture::new); - PendingWatch(WatchRequestTypeProto watch) { + PendingWatch(WatchRequestTypeProto watch, Timestamp creationTime) { this.watch = watch; + this.creationTime = creationTime; } CompletableFuture<Void> getFuture() { - return future; + return future.get(); } long getIndex() { return watch.getIndex(); } + Timestamp getCreationTime() { + return creationTime; + } + @Override public String toString() { - return RaftClientRequest.Type.toString(watch); + return RaftClientRequest.Type.toString(watch) + "@" + creationTime + + "?" + StringUtils.completableFuture2String(future.get(), true); } } private class WatchQueue { private final ReplicationLevel replication; - private final PriorityQueue<PendingWatch> q = new PriorityQueue<>(Comparator.comparing(PendingWatch::getIndex)); + private final SortedMap<PendingWatch, PendingWatch> q = new TreeMap<>( + Comparator.comparingLong(PendingWatch::getIndex).thenComparing(PendingWatch::getCreationTime)); private volatile long index; //Invariant: q.isEmpty() or index < any element q WatchQueue(ReplicationLevel replication) { @@ -69,13 +83,43 @@ class WatchRequests { return index; } - synchronized boolean offer(PendingWatch pending) { - if (pending.getIndex() > getIndex()) { // compare again synchronized - final boolean offered = q.offer(pending); - Preconditions.assertTrue(offered); - return true; + PendingWatch add(RaftClientRequest request) { + final long currentTime = Timestamp.currentTimeNanos(); + final long roundUp = watchTimeoutDenominationNanos.roundUp(currentTime); + final PendingWatch pending = new PendingWatch(request.getType().getWatch(), Timestamp.valueOf(roundUp)); + + synchronized (this) { + if (pending.getIndex() > getIndex()) { // compare again synchronized + final PendingWatch previous = q.putIfAbsent(pending, pending); + if (previous != null) { + return previous; + } + } else { + return null; + } + } + + final TimeDuration timeout = watchTimeoutNanos.apply(duration -> duration + roundUp - currentTime); + scheduler.onTimeout(timeout, () -> handleTimeout(request, pending), + LOG, () -> name + ": Failed to timeout " + request); + return pending; + } + + void handleTimeout(RaftClientRequest request, PendingWatch pending) { + if (removeExisting(pending)) { + pending.getFuture().completeExceptionally( + new NotReplicatedException(request.getCallId(), replication, pending.getIndex())); + LOG.debug("{}: timeout {}, {}", name, pending, request); } - return false; + } + + synchronized boolean removeExisting(PendingWatch pending) { + final PendingWatch removed = q.remove(pending); + if (removed == null) { + return false; + } + Preconditions.assertTrue(removed == pending); + return true; } synchronized void updateIndex(final long newIndex) { @@ -85,38 +129,53 @@ class WatchRequests { LOG.debug("{}: update {} index from {} to {}", name, replication, index, newIndex); index = newIndex; - for(;;) { - final PendingWatch peeked = q.peek(); - if (peeked == null || peeked.getIndex() > newIndex) { + for(; !q.isEmpty();) { + final PendingWatch first = q.firstKey(); + if (first.getIndex() > newIndex) { return; } - final PendingWatch polled = q.poll(); - Preconditions.assertTrue(polled == peeked); - LOG.debug("{}: complete {}", name, polled); - polled.getFuture().complete(null); + final boolean removed = removeExisting(first); + Preconditions.assertTrue(removed); + LOG.debug("{}: complete {}", name, first); + first.getFuture().complete(null); } } synchronized void failAll(Exception e) { - for(; !q.isEmpty(); ) { - q.poll().getFuture().completeExceptionally(e); + for(PendingWatch pending : q.values()) { + pending.getFuture().completeExceptionally(e); } + q.clear(); } } private final String name; private final Map<ReplicationLevel, WatchQueue> queues = new EnumMap<>(ReplicationLevel.class); - WatchRequests(Object name) { + private final TimeDuration watchTimeoutNanos; + private final TimeDuration watchTimeoutDenominationNanos; + private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(2); + + WatchRequests(Object name, RaftProperties properties) { this.name = name + "-" + getClass().getSimpleName(); + + final TimeDuration watchTimeout = RaftServerConfigKeys.watchTimeout(properties); + this.watchTimeoutNanos = watchTimeout.to(TimeUnit.NANOSECONDS); + final TimeDuration watchTimeoutDenomination = RaftServerConfigKeys.watchTimeoutDenomination(properties); + this.watchTimeoutDenominationNanos = watchTimeoutDenomination.to(TimeUnit.NANOSECONDS); + Preconditions.assertTrue(watchTimeoutNanos.getDuration() % watchTimeoutDenominationNanos.getDuration() == 0L, + () -> "watchTimeout (=" + watchTimeout + ") is not a multiple of watchTimeoutDenomination (=" + + watchTimeoutDenomination + ")."); + Arrays.stream(ReplicationLevel.values()).forEach(r -> queues.put(r, new WatchQueue(r))); } - CompletableFuture<Void> add(WatchRequestTypeProto watch) { + CompletableFuture<Void> add(RaftClientRequest request) { + final WatchRequestTypeProto watch = request.getType().getWatch(); final WatchQueue queue = queues.get(watch.getReplication()); if (watch.getIndex() > queue.getIndex()) { // compare without synchronization - final PendingWatch pending = new PendingWatch(watch); - if (queue.offer(pending)) { + final PendingWatch pending = queue.add(request); + if (pending != null) { return pending.getFuture(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index dbe5865..a43c42b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,19 +19,17 @@ package org.apache.ratis; import org.apache.log4j.Level; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.protocol.NotReplicatedException; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.statemachine.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.CheckedFunction; -import org.apache.ratis.util.LogUtils; -import org.apache.ratis.util.ProtoUtils; -import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.*; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -108,8 +106,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY), watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL), watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED), - watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED) - )); + watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED), + log)); }); } } @@ -120,7 +118,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> } } - static void runTest(CheckedFunction<TestParameters, Void, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception { + static void runTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception { try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); final RaftClient watchAllClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId()); @@ -133,7 +131,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> n, writeClient, watchMajorityClient, watchAllClient, watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG); LOG.info("{}) {}, {}", i, p, cluster.printServers()); - testCase.apply(p); + testCase.accept(p); } } } @@ -144,19 +142,45 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> private final CompletableFuture<RaftClientReply> all; private final CompletableFuture<RaftClientReply> majorityCommitted; private final CompletableFuture<RaftClientReply> allCommitted; + private final Logger log; WatchReplies(long logIndex, CompletableFuture<RaftClientReply> majority, CompletableFuture<RaftClientReply> all, - CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply> allCommitted) { + CompletableFuture<RaftClientReply> majorityCommitted, CompletableFuture<RaftClientReply> allCommitted, Logger log) { this.logIndex = logIndex; this.majority = majority; this.all = all; this.majorityCommitted = majorityCommitted; this.allCommitted = allCommitted; + this.log = log; + } + + RaftClientReply getMajority() throws Exception { + final RaftClientReply reply = majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + log.info("watchMajorityReply({}) = {}", logIndex, reply); + return reply; + } + + RaftClientReply getMajorityCommitted() throws Exception { + final RaftClientReply reply = majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + log.info("watchMajorityCommittedReply({}) = {}", logIndex, reply); + return reply; + } + + RaftClientReply getAll() throws Exception { + final RaftClientReply reply = all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + log.info("watchAllReply({}) = {}", logIndex, reply); + return reply; + } + + RaftClientReply getAllCommitted() throws Exception { + final RaftClientReply reply = allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + log.info("watchAllCommittedReply({}) = {}", logIndex, reply); + return reply; } } - static Void runTestWatchRequestAsync(TestParameters p) throws Exception { + static void runTestWatchRequestAsync(TestParameters p) throws Exception { final Logger LOG = p.log; final MiniRaftCluster cluster = p.cluster; final int numMessages = p.numMessages; @@ -203,7 +227,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> LOG.info("unblock follower {}", blockedFollower.getId()); SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); checkAll(watches, LOG); - return null; } static void checkMajority(List<CompletableFuture<RaftClientReply>> replies, @@ -216,13 +239,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); Assert.assertEquals(logIndex, watchReplies.logIndex); - final RaftClientReply watchMajorityReply = watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply); + final RaftClientReply watchMajorityReply = watchReplies.getMajority(); Assert.assertTrue(watchMajorityReply.isSuccess()); - final RaftClientReply watchMajorityCommittedReply - = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchMajorityCommittedReply({}) = {}", logIndex, watchMajorityCommittedReply); + final RaftClientReply watchMajorityCommittedReply = watchReplies.getMajorityCommitted(); Assert.assertTrue(watchMajorityCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchMajorityCommittedReply.getCommitInfos(); @@ -246,12 +266,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); final long logIndex = watchReplies.logIndex; LOG.info("checkAll {}: logIndex={}", i, logIndex); - final RaftClientReply watchAllReply = watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply); + final RaftClientReply watchAllReply = watchReplies.getAll(); Assert.assertTrue(watchAllReply.isSuccess()); - final RaftClientReply watchAllCommittedReply = watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); - LOG.info("watchAllCommittedReply({}) = {}", logIndex, watchAllCommittedReply); + final RaftClientReply watchAllCommittedReply = watchReplies.getAllCommitted(); Assert.assertTrue(watchAllCommittedReply.isSuccess()); { // check commit infos final Collection<CommitInfoProto> commitInfos = watchAllCommittedReply.getCommitInfos(); @@ -284,7 +302,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> cluster -> runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, cluster, LOG)); } - static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception { + static void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws Exception { final Logger LOG = p.log; final MiniRaftCluster cluster = p.cluster; final int numMessages = p.numMessages; @@ -317,6 +335,90 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); LOG.info("unblock follower {}", blockedFollower.getId()); checkAll(watches, LOG); - return null; + } + + @Test + public void testWatchRequestTimeout() throws Exception { + final RaftProperties p = getProperties(); + RaftServerConfigKeys.setWatchTimeout(p, TimeDuration.valueOf(500, TimeUnit.MILLISECONDS)); + RaftServerConfigKeys.setWatchTimeoutDenomination(p, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS)); + try { + runWithNewCluster(NUM_SERVERS, + cluster -> runTest(WatchRequestTests::runTestWatchRequestTimeout, cluster, LOG)); + } finally { + RaftServerConfigKeys.setWatchTimeout(p, RaftServerConfigKeys.WATCH_TIMEOUT_DEFAULT); + RaftServerConfigKeys.setWatchTimeoutDenomination(p, RaftServerConfigKeys.WATCH_TIMEOUT_DENOMINATION_DEFAULT); + } + } + + static void runTestWatchRequestTimeout(TestParameters p) throws Exception { + final Logger LOG = p.log; + final MiniRaftCluster cluster = p.cluster; + final int numMessages = p.numMessages; + + final TimeDuration watchTimeout = RaftServerConfigKeys.watchTimeout(cluster.getProperties()); + final TimeDuration watchTimeoutDenomination = RaftServerConfigKeys.watchTimeoutDenomination(cluster.getProperties()); + + // blockStartTransaction of the leader so that no transaction can be committed MAJORITY + final RaftServerImpl leader = cluster.getLeader(); + LOG.info("block leader {}", leader.getId()); + SimpleStateMachine4Testing.get(leader).blockStartTransaction(); + + // blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED + final List<RaftServerImpl> followers = cluster.getFollowers(); + final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size())); + LOG.info("block follower {}", blockedFollower.getId()); + SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData(); + + // send a message + final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); + final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>(); + + p.sendRequests(replies, watches); + + Assert.assertEquals(numMessages, replies.size()); + Assert.assertEquals(numMessages, watches.size()); + + watchTimeout.sleep(); + watchTimeoutDenomination.sleep(); // for roundup error + assertNotDone(replies); + assertNotDone(watches); + + // unblock leader so that the transaction can be committed. + SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); + LOG.info("unblock leader {}", leader.getId()); + + checkMajority(replies, watches, LOG); + checkTimeout(replies, watches, LOG); + + SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData(); + LOG.info("unblock follower {}", blockedFollower.getId()); + } + + static void checkTimeout(List<CompletableFuture<RaftClientReply>> replies, + List<CompletableFuture<WatchReplies>> watches, Logger LOG) throws Exception { + for(int i = 0; i < replies.size(); i++) { + final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + LOG.info("checkTimeout {}: receive {}", i, reply); + final long logIndex = reply.getLogIndex(); + Assert.assertTrue(reply.isSuccess()); + + final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS); + Assert.assertEquals(logIndex, watchReplies.logIndex); + + final RaftClientReply watchAllReply = watchReplies.getAll(); + assertNotReplicatedException(logIndex, ReplicationLevel.ALL, watchAllReply); + + final RaftClientReply watchAllCommittedReply = watchReplies.getAllCommitted(); + assertNotReplicatedException(logIndex, ReplicationLevel.ALL_COMMITTED, watchAllCommittedReply); + } + } + + static void assertNotReplicatedException(long logIndex, ReplicationLevel replication, RaftClientReply reply) { + Assert.assertFalse(reply.isSuccess()); + final NotReplicatedException nre = reply.getNotReplicatedException(); + Assert.assertNotNull(nre); + Assert.assertEquals(logIndex, nre.getLogIndex()); + Assert.assertEquals(replication, nre.getRequiredReplication()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/d72d9c6e/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java index 06d9301..782d80d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java @@ -19,6 +19,7 @@ package org.apache.ratis.util; import org.junit.Test; +import java.sql.Time; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -31,7 +32,7 @@ import static org.junit.Assert.assertNotNull; public class TestTimeDuration { @Test(timeout = 1000) - public void testTimeDuration() throws Exception { + public void testTimeDuration() { Arrays.asList(TimeUnit.values()) .forEach(a -> assertNotNull(Abbreviation.valueOf(a.name()))); assertEquals(TimeUnit.values().length, Abbreviation.values().length); @@ -81,4 +82,19 @@ public class TestTimeDuration { assertEquals(240, parse("10 day", TimeUnit.HOURS)); assertEquals(2400, parse("100 days", TimeUnit.HOURS)); } + + @Test(timeout = 1000) + public void testRoundUp() { + final long nanosPerSecond = 1_000_000_000L; + final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS); + assertEquals(-nanosPerSecond, oneSecond.roundUp(-nanosPerSecond - 1)); + assertEquals(-nanosPerSecond, oneSecond.roundUp(-nanosPerSecond)); + assertEquals(0, oneSecond.roundUp(-nanosPerSecond + 1)); + assertEquals(0, oneSecond.roundUp(-1)); + assertEquals(0, oneSecond.roundUp(0)); + assertEquals(nanosPerSecond, oneSecond.roundUp(1)); + assertEquals(nanosPerSecond, oneSecond.roundUp(nanosPerSecond - 1)); + assertEquals(nanosPerSecond, oneSecond.roundUp(nanosPerSecond)); + assertEquals(2*nanosPerSecond, oneSecond.roundUp(nanosPerSecond + 1)); + } }
