Repository: incubator-ratis Updated Branches: refs/heads/master 3b0be0287 -> 00274fa39
RATIS-453. When retry failed on an async call, it should fails all the following calls in the sliding window. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/00274fa3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/00274fa3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/00274fa3 Branch: refs/heads/master Commit: 00274fa39073e359433005c96097efaa24024702 Parents: 3b0be02 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Tue Dec 11 13:21:41 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Tue Dec 11 13:21:41 2018 -0800 ---------------------------------------------------------------------- .../ratis/client/impl/RaftClientImpl.java | 62 ++++++----- .../ratis/protocol/AlreadyClosedException.java | 6 +- .../org/apache/ratis/retry/RetryPolicies.java | 4 +- .../org/apache/ratis/retry/RetryPolicy.java | 5 +- .../org/apache/ratis/util/SlidingWindow.java | 41 ++++++- .../function/CheckedFunctionWithTimeout.java | 2 +- .../ratis/util/function/FunctionUtils.java | 34 ++++++ .../test/java/org/apache/ratis/BaseTest.java | 54 +++++----- .../grpc/client/GrpcClientProtocolClient.java | 3 +- .../java/org/apache/ratis/MiniRaftCluster.java | 9 +- .../java/org/apache/ratis/RaftAsyncTests.java | 106 +++++++++++++++---- .../java/org/apache/ratis/RaftTestUtil.java | 12 +++ 12 files changed, 252 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 4c73d45..58206bd 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -20,11 +20,12 @@ package org.apache.ratis.client.impl; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientRpc; -import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.*; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.protocol.*; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.util.*; +import org.apache.ratis.util.function.FunctionUtils; import java.io.IOException; import java.io.InterruptedIOException; @@ -78,11 +79,16 @@ final class RaftClientImpl implements RaftClient { replyFuture.complete(reply); } + @Override + public void fail(Exception e) { + replyFuture.completeExceptionally(e); + } + CompletableFuture<RaftClientReply> getReplyFuture() { return replyFuture; } - public int getAttemptCount() { + int getAttemptCount() { return attemptCount; } @@ -164,9 +170,10 @@ final class RaftClientImpl implements RaftClient { try { asyncRequestSemaphore.acquire(); } catch (InterruptedException e) { - throw new CompletionException(IOUtils.toInterruptedIOException( + return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException( "Interrupted when sending " + type + ", message=" + message, e)); } + final long callId = nextCallId(); final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum, seq -> newRaftClientRequest(server, callId, seq, message, type)); @@ -268,11 +275,17 @@ final class RaftClientImpl implements RaftClient { peersInNewConf.filter(p -> !peers.contains(p))::iterator); } - private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync( - PendingAsyncRequest pending) { - final RaftClientRequest request = pending.newRequest(); + private void sendRequestWithRetryAsync(PendingAsyncRequest pending) { final CompletableFuture<RaftClientReply> f = pending.getReplyFuture(); - return sendRequestAsync(request, pending.getAttemptCount()).thenCompose(reply -> { + if (f.isDone()) { + return; + } + + final RaftClientRequest request = pending.newRequest(); + sendRequestAsync(request, pending.getAttemptCount()).thenAccept(reply -> { + if (f.isDone()) { + return; + } if (reply == null) { LOG.debug("schedule attempt #{} with policy {} for {}", pending.getAttemptCount(), retryPolicy, request); scheduler.onTimeout(retryPolicy.getSleepTime(), @@ -281,8 +294,7 @@ final class RaftClientImpl implements RaftClient { } else { f.complete(reply); } - return f; - }); + }).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally)); } private RaftClientReply sendRequestWithRetry( @@ -315,7 +327,7 @@ final class RaftClientImpl implements RaftClient { getSlidingWindow(request).receiveReply( request.getSeqNum(), reply, this::sendRequestWithRetryAsync); } else if (!retryPolicy.shouldRetry(attemptCount)) { - return handleAsyncRetry(request, attemptCount); + handleAsyncRetryFailure(request, attemptCount); } return reply; }).exceptionally(e -> { @@ -325,30 +337,22 @@ final class RaftClientImpl implements RaftClient { LOG.debug("{}: Failed {} with {}", clientId, request, e); } e = JavaUtils.unwrapCompletionException(e); - if (e instanceof GroupMismatchException) { - throw new CompletionException(e); - } else if (e instanceof IOException) { - // once the retryLimit is hit, just remove the request from the - // sliding window and throw an exception. The exception thrown here will - // make sure its not retried any more with sendRequestWithRetryAsync call. + if (e instanceof IOException && !(e instanceof GroupMismatchException)) { if (!retryPolicy.shouldRetry(attemptCount)) { - return handleAsyncRetry(request, attemptCount); + handleAsyncRetryFailure(request, attemptCount); + } else { + handleIOException(request, (IOException) e, null); } - handleIOException(request, (IOException)e, null); - } else { - throw new CompletionException(e); + return null; } - return null; + throw new CompletionException(e); }); } - private RaftClientReply handleAsyncRetry(RaftClientRequest request, int attemptCount) { - RaftClientReply reply = new RaftClientReply(request, - new RaftRetryFailureException( - "Failed " + request + " for " + attemptCount + " attempts with " + retryPolicy), null); - getSlidingWindow(request).receiveReply( - request.getSeqNum(), reply, this::sendRequestWithRetryAsync); - return reply; + private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) { + final RaftRetryFailureException rfe = new RaftRetryFailureException( + "Failed " + request + " for " + (attemptCount-1) + " attempts with " + retryPolicy); + getSlidingWindow(request).fail(request.getSeqNum(), rfe); } private RaftClientReply sendRequest(RaftClientRequest request) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java index 85888a0..f69173f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,4 +24,8 @@ public class AlreadyClosedException extends RaftException { public AlreadyClosedException(String message) { super(message); } + + public AlreadyClosedException(String message, Throwable t) { + super(message, t); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java index b405f81..e5cdeaa 100644 --- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java +++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -45,7 +45,7 @@ public interface RetryPolicies { * Keep trying a limited number of times, waiting a fixed time between attempts, * and then fail by re-throwing the exception. */ - static RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxAttempts, TimeDuration sleepTime) { + static RetryLimited retryUpToMaximumCountWithFixedSleep(int maxAttempts, TimeDuration sleepTime) { return new RetryLimited(maxAttempts, sleepTime); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java index 771e524..ba90435 100644 --- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java +++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; * Policy abstract for retrying. */ public interface RetryPolicy { + TimeDuration ZERO_MILLIS = TimeDuration.valueOf(0, TimeUnit.MILLISECONDS); /** * Determines whether it is supposed to retry the connection if the operation @@ -39,6 +40,6 @@ public interface RetryPolicy { * Returns the time duration for sleep in between the retries. */ default TimeDuration getSleepTime() { - return TimeDuration.valueOf(0, TimeUnit.MILLISECONDS); + return ZERO_MILLIS; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java index ca622dd..a616f07 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.ratis.util; +import org.apache.ratis.protocol.AlreadyClosedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,9 @@ public interface SlidingWindow { void setReply(REPLY reply); boolean hasReply(); + + default void fail(Exception e) { + } } /** A seqNum-to-request map, sorted by seqNum. */ @@ -169,6 +173,8 @@ public interface SlidingWindow { private long firstSeqNum = -1; /** Is the first request replied? */ private boolean firstReplied; + /** The exception, if there is any. */ + private Exception exception; public Client(Object name) { this.requests = new RequestMap<REQUEST, REPLY>(name) { @@ -206,6 +212,12 @@ public interface SlidingWindow { final long seqNum = nextSeqNum++; final REQUEST r = requestConstructor.apply(seqNum); + + if (exception != null) { + alreadyClosed(r, exception); + return r; + } + requests.putNewRequest(r); final boolean submitted = sendOrDelayRequest(r, sendMethod); @@ -302,6 +314,33 @@ public interface SlidingWindow { firstReplied = false; LOG.debug("After resetFirstSeqNum: {}", this); } + + /** Fail all requests starting from the given seqNum. */ + public synchronized void fail(final long startingSeqNum, Exception e) { + exception = e; + + boolean handled = false; + for(long i = startingSeqNum; i <= requests.lastSeqNum(); i++) { + final REQUEST request = requests.getNonRepliedRequest(i, "fail"); + if (request != null) { + if (request.getSeqNum() == startingSeqNum) { + request.fail(e); + } else { + alreadyClosed(request, e); + } + handled = true; + } + } + + if (handled) { + removeRepliedFromHead(); + } + } + + private void alreadyClosed(REQUEST request, Exception e) { + request.fail(new AlreadyClosedException(SlidingWindow.class.getSimpleName() + "$" + getClass().getSimpleName() + + " " + requests.getName() + " is closed.", e)); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java index fddfab2..48b6b9f 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java @@ -25,7 +25,7 @@ import java.util.concurrent.TimeoutException; @FunctionalInterface public interface CheckedFunctionWithTimeout<INPUT, OUTPUT, THROWABLE extends Throwable> { /** - * The same as {@link org.apache.ratis.util.CheckedFunction#apply(Object)} + * The same as {@link CheckedFunction#apply(Object)} * except that this method has a timeout parameter and throws {@link TimeoutException}. */ OUTPUT apply(INPUT input, TimeDuration timeout) throws TimeoutException, THROWABLE; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java new file mode 100644 index 0000000..0e982cb --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/function/FunctionUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util.function; + +import java.util.function.Consumer; +import java.util.function.Function; + +public interface FunctionUtils { + /** + * Convert the given consumer to a function with any output type + * such that the returned function always returns null. + */ + static <INPUT, OUTPUT> Function<INPUT, OUTPUT> consumerAsNullFunction(Consumer<INPUT> consumer) { + return input -> { + consumer.accept(input); + return null; + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-common/src/test/java/org/apache/ratis/BaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java index 3612d21..f7015b7 100644 --- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java +++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java @@ -22,6 +22,7 @@ import org.apache.ratis.conf.ConfUtils; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.function.CheckedRunnable; import org.junit.Assert; import org.junit.Rule; @@ -35,11 +36,14 @@ import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public abstract class BaseTest { public final Logger LOG = LoggerFactory.getLogger(getClass()); + public static final TimeDuration HUNDRED_MILLIS = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + { LogUtils.setLogLevel(ConfUtils.LOG, Level.WARN); LogUtils.setLogLevel(FileUtils.LOG, Level.TRACE); @@ -83,14 +87,14 @@ public abstract class BaseTest { @SafeVarargs public static void assertThrowable( String description, Throwable t, - Class<? extends Throwable> exceptedThrowableClass, Logger log, - Class<? extends Throwable>... exceptedCauseClasses) { + Class<? extends Throwable> expectedThrowableClass, Logger log, + Class<? extends Throwable>... expectedCauseClasses) { if (log != null) { log.info("The test \"" + description + "\" throws " + t.getClass().getSimpleName(), t); } - Assert.assertEquals(exceptedThrowableClass, t.getClass()); + Assert.assertEquals(expectedThrowableClass, t.getClass()); - for (Class<? extends Throwable> expectedCause : exceptedCauseClasses) { + for (Class<? extends Throwable> expectedCause : expectedCauseClasses) { final Throwable previous = t; t = Objects.requireNonNull(previous.getCause(), () -> "previous.getCause() == null for previous=" + previous); @@ -99,48 +103,46 @@ public abstract class BaseTest { } @SafeVarargs - public static void testFailureCase( + public static Throwable testFailureCase( String description, CheckedRunnable<?> testCode, - Class<? extends Throwable> exceptedThrowableClass, Logger log, - Class<? extends Throwable>... exceptedCauseClasses) { - boolean caught = false; + Class<? extends Throwable> expectedThrowableClass, Logger log, + Class<? extends Throwable>... expectedCauseClasses) { try { testCode.run(); } catch (Throwable t) { - caught = true; - assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses); - } - if (!caught) { - Assert.fail("The test \"" + description + "\" does not throw anything."); + assertThrowable(description, t, expectedThrowableClass, log, expectedCauseClasses); + return t; } + throw new AssertionError("The test \"" + description + "\" does not throw anything."); } @SafeVarargs - public final void testFailureCase( + public final Throwable testFailureCase( String description, CheckedRunnable<?> testCode, - Class<? extends Throwable> exceptedThrowableClass, - Class<? extends Throwable>... exceptedCauseClasses) { - testFailureCase(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses); + Class<? extends Throwable> expectedThrowableClass, + Class<? extends Throwable>... expectedCauseClasses) { + return testFailureCase(description, testCode, expectedThrowableClass, LOG, expectedCauseClasses); } @SafeVarargs - public static void testFailureCaseAsync( + public static Throwable testFailureCaseAsync( String description, Supplier<CompletableFuture<?>> testCode, - Class<? extends Throwable> exceptedThrowableClass, Logger log, - Class<? extends Throwable>... exceptedCauseClasses) { + Class<? extends Throwable> expectedThrowableClass, Logger log, + Class<? extends Throwable>... expectedCauseClasses) { try { testCode.get().join(); - Assert.fail("The test \"" + description + "\" does not throw anything."); } catch (Throwable t) { t = JavaUtils.unwrapCompletionException(t); - assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses); + assertThrowable(description, t, expectedThrowableClass, log, expectedCauseClasses); + return t; } + throw new AssertionError("The test \"" + description + "\" does not throw anything."); } @SafeVarargs - public final void testFailureCaseAsync( - String description, Supplier<CompletableFuture<?>> testCode, Class<? extends Throwable> exceptedThrowableClass, - Class<? extends Throwable>... exceptedCauseClasses) { - testFailureCaseAsync(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses); + public final Throwable testFailureCaseAsync( + String description, Supplier<CompletableFuture<?>> testCode, Class<? extends Throwable> expectedThrowableClass, + Class<? extends Throwable>... expectedCauseClasses) { + return testFailureCaseAsync(description, testCode, expectedThrowableClass, LOG, expectedCauseClasses); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index cf239b6..8a1b111 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -35,6 +35,7 @@ import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolService import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc; import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; +import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.NotLeaderException; import org.apache.ratis.protocol.RaftClientReply; @@ -201,7 +202,7 @@ public class GrpcClientProtocolClient implements Closeable { CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) { final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get(); if (map == null) { - return JavaUtils.completeExceptionally(new IOException("Already closed.")); + return JavaUtils.completeExceptionally(new AlreadyClosedException(getName() + " is closed.")); } final CompletableFuture<RaftClientReply> f = new CompletableFuture<>(); CollectionUtils.putNew(request.getCallId(), f, map, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index f1f33e1..31faf35 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -103,11 +103,18 @@ public abstract class MiniRaftCluster implements Closeable { } default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception { + runWithNewCluster(numServers, true, testCase); + } + + default void runWithNewCluster(int numServers, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase) + throws Exception { final StackTraceElement caller = JavaUtils.getCallerStackTraceElement(); LOG.info("Running " + caller.getMethodName()); final CLUSTER cluster = newCluster(numServers); try { - cluster.start(); + if (startCluster) { + cluster.start(); + } testCase.accept(cluster); } catch(Throwable t) { LOG.error("Failed " + caller + ": " + cluster.printServers(), t); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 0719976..3821f5f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -25,6 +25,7 @@ import org.apache.ratis.client.impl.RaftClientTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroup; @@ -32,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftRetryFailureException; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicies.RetryLimited; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; @@ -42,6 +44,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferExce import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.function.CheckedRunnable; import org.junit.Assert; import org.junit.Test; @@ -52,6 +55,7 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -95,31 +99,89 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba } } + static void assertRaftRetryFailureException(RaftRetryFailureException rfe, RetryPolicy retryPolicy, String name) { + Assert.assertNotNull(name + " does not have RaftRetryFailureException", rfe); + Assert.assertTrue(name + ": unexpected error message, rfe=" + rfe + ", retryPolicy=" + retryPolicy, + rfe.getMessage().contains(retryPolicy.toString())); + } + @Test - public void testRequestAsyncWithRetryPolicy() throws Exception { - runWithNewCluster(NUM_SERVERS, this::runTestRequestAsyncWithRetryPolicy); + public void testRequestAsyncWithRetryFailure() throws Exception { + runWithNewCluster(1, false, cluster -> runTestRequestAsyncWithRetryFailure(false, cluster)); } - void runTestRequestAsyncWithRetryPolicy(CLUSTER cluster) throws Exception { - final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( - 3, TimeDuration.valueOf(1, TimeUnit.SECONDS)); - final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster); - - try(final RaftClient writeClient = cluster.createClient(leader.getId(), retryPolicy)) { - // blockStartTransaction of the leader so that no transaction can be committed MAJORITY - LOG.info("block leader {}", leader.getId()); - SimpleStateMachine4Testing.get(leader).blockStartTransaction(); - final SimpleMessage[] messages = SimpleMessage.create(2); - final RaftClientReply reply = writeClient.sendAsync(messages[0]).get(); - RaftRetryFailureException rfe = reply.getRetryFailureException(); - Assert.assertNotNull(rfe); - Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString())); - - // unblock leader so that the next transaction can be committed. - SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); - // make sure the the next request succeeds. This will ensure the first - // request completed - writeClient.sendAsync(messages[1]).get(); + @Test + public void testRequestAsyncWithRetryFailureAfterInitialMessages() throws Exception { + runWithNewCluster(1, true, cluster -> runTestRequestAsyncWithRetryFailure(true, cluster)); + } + + void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER cluster) throws Exception { + final RetryLimited retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, HUNDRED_MILLIS); + + try(final RaftClient client = cluster.createClient(null, retryPolicy)) { + RaftPeerId leader = null; + if (initialMessages) { + // cluster is already started, send a few success messages + leader = RaftTestUtil.waitForLeader(cluster).getId(); + final SimpleMessage[] messages = SimpleMessage.create(10, "initial-"); + final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); + for (int i = 0; i < messages.length; i++) { + replies.add(client.sendAsync(messages[i])); + } + for (int i = 0; i < messages.length; i++) { + RaftTestUtil.assertSuccessReply(replies.get(i)); + } + + // kill the only server + cluster.killServer(leader); + } + + // now, either the cluster is not yet started or the server is killed. + final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>(); + { + final SimpleMessage[] messages = SimpleMessage.create(10); + int i = 0; + // send half of the calls without starting the cluster + for (; i < messages.length/2; i++) { + replies.add(client.sendAsync(messages[i])); + } + + // sleep most of the retry time + retryPolicy.getSleepTime().apply(t -> t * (retryPolicy.getMaxAttempts() - 1)).sleep(); + + // send another half of the calls without starting the cluster + for (; i < messages.length; i++) { + replies.add(client.sendAsync(messages[i])); + } + Assert.assertEquals(messages.length, replies.size()); + } + + // sleep again so that the first half calls will fail retries. + // the second half still have retry time remaining. + retryPolicy.getSleepTime().apply(t -> t*2).sleep(); + + if (leader != null) { + cluster.restartServer(leader, false); + } else { + cluster.start(); + } + + // all the calls should fail for ordering guarantee + for(int i = 0; i < replies.size(); i++) { + final CheckedRunnable<Exception> getReply = replies.get(i)::get; + final String name = "retry-failure-" + i; + if (i == 0) { + final Throwable t = testFailureCase(name, getReply, + ExecutionException.class, RaftRetryFailureException.class); + assertRaftRetryFailureException((RaftRetryFailureException) t.getCause(), retryPolicy, name); + } else { + testFailureCase(name, getReply, + ExecutionException.class, AlreadyClosedException.class, RaftRetryFailureException.class); + } + } + + testFailureCaseAsync("last-request", () -> client.sendAsync(new SimpleMessage("last")), + AlreadyClosedException.class, RaftRetryFailureException.class); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00274fa3/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index a96b917..6f0a20a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -23,6 +23,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; @@ -51,6 +52,8 @@ import java.util.EnumMap; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; @@ -424,4 +427,13 @@ public interface RaftTestUtil { } return null; } + + static void assertSuccessReply(CompletableFuture<RaftClientReply> reply) throws Exception { + assertSuccessReply(reply.get(10, TimeUnit.SECONDS)); + } + + static void assertSuccessReply(RaftClientReply reply) { + Assert.assertNotNull("reply == null", reply); + Assert.assertTrue("reply is not success: " + reply, reply.isSuccess()); + } }
