Repository: incubator-ratis Updated Branches: refs/heads/master a0f19ceb2 -> 3fde3d2ac
RATIS-386. Raft Client Async API's should honor Retry Policy. Contributed by Shashikant Banerjee Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3fde3d2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3fde3d2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3fde3d2a Branch: refs/heads/master Commit: 3fde3d2ac7c48c93d2dd1b18c560cee1d918046e Parents: a0f19ce Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Nov 16 13:11:18 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Nov 16 13:11:18 2018 -0800 ---------------------------------------------------------------------- .../ratis/client/RaftClientConfigKeys.java | 10 --- .../ratis/client/impl/RaftClientImpl.java | 37 ++++++++--- .../apache/ratis/protocol/RaftClientReply.java | 9 ++- .../protocol/RaftRetryFailureException.java | 28 ++++++++ .../org/apache/ratis/retry/RetryPolicies.java | 68 +++++++++++++++----- .../org/apache/ratis/retry/RetryPolicy.java | 4 +- .../org/apache/ratis/grpc/GrpcConfigKeys.java | 4 +- .../java/org/apache/ratis/MiniRaftCluster.java | 28 ++++++-- .../java/org/apache/ratis/RaftAsyncTests.java | 30 +++++++++ .../org/apache/ratis/WatchRequestTests.java | 1 - .../java/org/apache/ratis/TestRetryPolicy.java | 2 +- 11 files changed, 176 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java index 10fc69d..b07dade 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java @@ -39,16 +39,6 @@ public interface RaftClientConfigKeys { interface Rpc { String PREFIX = RaftClientConfigKeys.PREFIX + ".rpc"; - String RETRY_INTERVAL_KEY = PREFIX + ".retryInterval"; - TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); - static TimeDuration retryInterval(RaftProperties properties) { - return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()), - RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog()); - } - static void setRetryInterval(RaftProperties properties, TimeDuration timeoutDuration) { - setTimeDuration(properties::setTimeDuration, RETRY_INTERVAL_KEY, timeoutDuration); - } - String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout"; TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); static TimeDuration requestTimeout(RaftProperties properties) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index c49a360..36508b9 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -52,6 +52,7 @@ final class RaftClientImpl implements RaftClient { private final long seqNum; private final LongFunction<RaftClientRequest> requestConstructor; private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>(); + private volatile int attemptCount; PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) { this.seqNum = seqNum; @@ -59,6 +60,7 @@ final class RaftClientImpl implements RaftClient { } RaftClientRequest newRequest() { + attemptCount++; return requestConstructor.apply(seqNum); } @@ -81,6 +83,10 @@ final class RaftClientImpl implements RaftClient { return replyFuture; } + public int getAttemptCount() { + return attemptCount; + } + @Override public String toString() { return "[seq=" + getSeqNum() + "]"; @@ -91,7 +97,6 @@ final class RaftClientImpl implements RaftClient { private final RaftClientRpc clientRpc; private final Collection<RaftPeer> peers; private final RaftGroupId groupId; - private final TimeDuration retryInterval; private final RetryPolicy retryPolicy; private volatile RaftPeerId leaderId; @@ -110,7 +115,6 @@ final class RaftClientImpl implements RaftClient { this.groupId = group.getGroupId(); this.leaderId = leaderId != null? leaderId : !peers.isEmpty()? peers.iterator().next().getId(): null; - this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties); Preconditions.assertTrue(retryPolicy != null, "retry policy can't be null"); this.retryPolicy = retryPolicy; @@ -268,10 +272,10 @@ final class RaftClientImpl implements RaftClient { PendingAsyncRequest pending) { final RaftClientRequest request = pending.newRequest(); final CompletableFuture<RaftClientReply> f = pending.getReplyFuture(); - return sendRequestAsync(request).thenCompose(reply -> { + return sendRequestAsync(request, pending.getAttemptCount()).thenCompose(reply -> { if (reply == null) { - LOG.debug("schedule a retry in {} for {}", retryInterval, request); - scheduler.onTimeout(retryInterval, + LOG.debug("schedule attempt #{} with policy {} for {}", pending.getAttemptCount(), retryPolicy, request); + scheduler.onTimeout(retryPolicy.getSleepTime(), () -> getSlidingWindow(request).retry(pending, this::sendRequestWithRetryAsync), LOG, () -> "Failed to retry " + request); } else { @@ -284,13 +288,13 @@ final class RaftClientImpl implements RaftClient { private RaftClientReply sendRequestWithRetry( Supplier<RaftClientRequest> supplier) throws InterruptedIOException, StateMachineException, GroupMismatchException { - for(int retryCount = 0;; retryCount++) { + for(int attemptCount = 0;; attemptCount++) { final RaftClientRequest request = supplier.get(); final RaftClientReply reply = sendRequest(request); if (reply != null) { return reply; } - if (!retryPolicy.shouldRetry(retryCount)) { + if (!retryPolicy.shouldRetry(attemptCount)) { return null; } try { @@ -302,7 +306,7 @@ final class RaftClientImpl implements RaftClient { } private CompletableFuture<RaftClientReply> sendRequestAsync( - RaftClientRequest request) { + RaftClientRequest request, int attemptCount) { LOG.debug("{}: send* {}", clientId, request); return clientRpc.sendRequestAsync(request).thenApply(reply -> { LOG.debug("{}: receive* {}", clientId, reply); @@ -310,6 +314,8 @@ final class RaftClientImpl implements RaftClient { if (reply != null) { getSlidingWindow(request).receiveReply( request.getSeqNum(), reply, this::sendRequestWithRetryAsync); + } else if (!retryPolicy.shouldRetry(attemptCount)) { + return handleAsyncRetry(request, attemptCount); } return reply; }).exceptionally(e -> { @@ -322,6 +328,12 @@ final class RaftClientImpl implements RaftClient { if (e instanceof GroupMismatchException) { throw new CompletionException(e); } else if (e instanceof IOException) { + // once the retryLimit is hit, just remove the request from the + // sliding window and throw an exception. The exception thrown here will + // make sure its not retried any more with sendRequestWithRetryAsync call. + if (!retryPolicy.shouldRetry(attemptCount)) { + return handleAsyncRetry(request, attemptCount); + } handleIOException(request, (IOException)e, null); } else { throw new CompletionException(e); @@ -330,6 +342,15 @@ final class RaftClientImpl implements RaftClient { }); } + private RaftClientReply handleAsyncRetry(RaftClientRequest request, int attemptCount) { + RaftClientReply reply = new RaftClientReply(request, + new RaftRetryFailureException( + "Failed " + request + " for " + attemptCount + " attempts with " + retryPolicy), null); + getSlidingWindow(request).receiveReply( + request.getSeqNum(), reply, this::sendRequestWithRetryAsync); + return reply; + } + private RaftClientReply sendRequest(RaftClientRequest request) throws StateMachineException, GroupMismatchException { LOG.debug("{}: send {}", clientId, request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java index 0ec9f75..7b3979b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java @@ -67,8 +67,8 @@ public class RaftClientReply extends RaftClientMessage { Preconditions.assertTrue(!success, () -> "Inconsistent parameters: success && exception != null: " + this); Preconditions.assertTrue(ReflectionUtils.isInstance(exception, - NotLeaderException.class, NotReplicatedException.class, StateMachineException.class), - () -> "Unexpected exception class: " + this); + NotLeaderException.class, NotReplicatedException.class, StateMachineException.class, + RaftRetryFailureException.class), () -> "Unexpected exception class: " + this); } } @@ -143,4 +143,9 @@ public class RaftClientReply extends RaftClientMessage { public StateMachineException getStateMachineException() { return JavaUtils.cast(exception, StateMachineException.class); } + + /** If this reply has {@link RaftRetryFailureException}, return it; otherwise return null. */ + public RaftRetryFailureException getRetryFailureException() { + return JavaUtils.cast(exception, RaftRetryFailureException.class); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java new file mode 100644 index 0000000..690e96b --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * Retry failure as per the retryPolicy defined. + */ +public class RaftRetryFailureException extends RaftException { + + public RaftRetryFailureException(String msg) { + super(msg); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java index 1f2df97..b405f81 100644 --- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java +++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java @@ -35,11 +35,18 @@ public interface RetryPolicies { } /** + * Keep retrying forever with fixed sleep. + */ + static RetryPolicy retryForeverWithSleep(TimeDuration sleepTime) { + return new RetryForeverWithSleep(sleepTime); + } + + /** * Keep trying a limited number of times, waiting a fixed time between attempts, * and then fail by re-throwing the exception. */ - static RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxRetries, TimeDuration sleepTime) { - return new RetryLimited(maxRetries, sleepTime); + static RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxAttempts, TimeDuration sleepTime) { + return new RetryLimited(maxAttempts, sleepTime); } class Constants { @@ -51,7 +58,7 @@ public interface RetryPolicies { private RetryForeverNoSleep() {} @Override - public boolean shouldRetry(int retryCount) { + public boolean shouldRetry(int attemptCount) { return true; } @@ -65,7 +72,7 @@ public interface RetryPolicies { private NoRetry() {} @Override - public boolean shouldRetry(int retryCount) { + public boolean shouldRetry(int attemptCount) { return false; } @@ -75,8 +82,39 @@ public interface RetryPolicies { } } + class RetryForeverWithSleep implements RetryPolicy { + private final TimeDuration sleepTime; + + private String myString; + + RetryForeverWithSleep(TimeDuration sleepTime) { + if (sleepTime.isNegative()) { + throw new IllegalArgumentException( + "sleepTime = " + sleepTime.getDuration() + " < 0"); + } + this.sleepTime = sleepTime; + } + + @Override + public TimeDuration getSleepTime() { + return sleepTime; + } + + @Override + public boolean shouldRetry(int attemptCount) { + return true; + } + + @Override + public String toString() { + if (myString == null) { + myString = getClass().getSimpleName() + "(sleepTime = " + sleepTime + ")"; + } + return myString; + } + } /** - * Retry up to maxRetries. + * Retry up to maxAttempts. * The actual sleep time of the n-th retry is f(n, sleepTime), * where f is a function provided by the subclass implementation. * @@ -84,21 +122,21 @@ public interface RetryPolicies { * otherwise, the subclass must override hashCode(), equals(..) and toString(). */ class RetryLimited implements RetryPolicy { - private final int maxRetries; + private final int maxAttempts; private final TimeDuration sleepTime; private String myString; - RetryLimited(int maxRetries, TimeDuration sleepTime) { - if (maxRetries < 0) { - throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 0"); + RetryLimited(int maxAttempts, TimeDuration sleepTime) { + if (maxAttempts < 0) { + throw new IllegalArgumentException("maxAttempts = " + maxAttempts+" < 0"); } if (sleepTime.isNegative()) { throw new IllegalArgumentException( "sleepTime = " + sleepTime.getDuration() + " < 0"); } - this.maxRetries = maxRetries; + this.maxAttempts = maxAttempts; this.sleepTime = sleepTime; } @@ -107,19 +145,19 @@ public interface RetryPolicies { return sleepTime; } - public int getMaxRetries() { - return maxRetries; + public int getMaxAttempts() { + return maxAttempts; } @Override - public boolean shouldRetry(int retryCount) { - return retryCount < maxRetries; + public boolean shouldRetry(int attemptCount) { + return attemptCount <= maxAttempts; } @Override public String toString() { if (myString == null) { - myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries + myString = getClass().getSimpleName() + "(maxAttempts=" + maxAttempts + ", sleepTime=" + sleepTime + ")"; } return myString; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java index 3de972a..771e524 100644 --- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java +++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java @@ -30,10 +30,10 @@ public interface RetryPolicy { * Determines whether it is supposed to retry the connection if the operation * fails for some reason. * - * @param retryCount The number of times retried so far + * @param attemptCount The number of times attempted so far * @return true if it has to make another attempt, otherwise, false */ - boolean shouldRetry(int retryCount); + boolean shouldRetry(int attemptCount); /** * Returns the time duration for sleep in between the retries. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index ab9ea8c..5147d8c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -17,13 +17,13 @@ */ package org.apache.ratis.grpc; -import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.apache.ratis.conf.ConfUtils.*; @@ -89,7 +89,7 @@ public interface GrpcConfigKeys { } String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval"; - TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.RETRY_INTERVAL_DEFAULT; + TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS); static TimeDuration retryInterval(RaftProperties properties) { return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()), RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 0e352f4..0cf9449 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -21,6 +21,8 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; @@ -61,6 +63,8 @@ public abstract class MiniRaftCluster implements Closeable { public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName(); public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine(); + private static final TimeDuration RETRY_INTERVAL_DEFAULT = + TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); public static abstract class Factory<CLUSTER extends MiniRaftCluster> { public interface Get<CLUSTER extends MiniRaftCluster> { @@ -155,6 +159,7 @@ public abstract class MiniRaftCluster implements Closeable { protected final Map<RaftPeerId, RaftPeer> peers = new ConcurrentHashMap<>(); private volatile StateMachine.Registry stateMachineRegistry = null; + private volatile TimeDuration retryInterval; private final Timer timer; @@ -532,6 +537,10 @@ public abstract class MiniRaftCluster implements Closeable { return getServerStream(groupId).filter(RaftServerImpl::isAlive); } + private RetryPolicy getDefaultRetryPolicy() { + return RetryPolicies.retryForeverWithSleep(RETRY_INTERVAL_DEFAULT); + } + public RaftServerProxy getServer(RaftPeerId id) { return servers.get(id); } @@ -572,18 +581,29 @@ public abstract class MiniRaftCluster implements Closeable { return createClient(leaderId, group); } + public RaftClient createClient(RaftPeerId leaderId, RetryPolicy retryPolicy) { + return createClient(leaderId, group, null, retryPolicy); + } + public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) { - return createClient(leaderId, group, null); + return createClient(leaderId, group, null, getDefaultRetryPolicy()); + } + + public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, + ClientId clientId) { + return createClient(leaderId, group, clientId, getDefaultRetryPolicy()); } - public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, ClientId clientId) { - return RaftClient.newBuilder() + public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, + ClientId clientId, RetryPolicy retryPolicy) { + RaftClient.Builder builder = RaftClient.newBuilder() .setClientId(clientId) .setRaftGroup(group) .setLeaderId(leaderId) .setProperties(properties) .setParameters(parameters) - .build(); + .setRetryPolicy(retryPolicy); + return builder.build(); } public RaftClientRequest newRaftClientRequest( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index c14515c..46630b4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -23,8 +23,11 @@ import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.impl.RaftClientTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; +import org.apache.ratis.retry.RetryPolicies; +import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RetryCacheTestUtil; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; @@ -90,6 +93,33 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba } @Test + public void testRequestAsyncWithRetryPolicy() throws Exception { + LOG.info("Running testWatchRequestsWithRetryPolicy"); + try(final CLUSTER cluster = newCluster(NUM_SERVERS)) { + int maxRetries = 3; + final RetryPolicy retryPolicy = RetryPolicies + .retryUpToMaximumCountWithFixedSleep(maxRetries, TimeDuration.valueOf(1, TimeUnit.SECONDS)); + cluster.start(); + final RaftClient writeClient = + cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId(), retryPolicy); + // blockStartTransaction of the leader so that no transaction can be committed MAJORITY + final RaftServerImpl leader = cluster.getLeader(); + LOG.info("block leader {}", leader.getId()); + SimpleStateMachine4Testing.get(leader).blockStartTransaction(); + RaftClientReply reply = + writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get(); + RaftRetryFailureException rfe = reply.getRetryFailureException(); + Assert.assertTrue(rfe != null); + Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString())); + // unblock leader so that the next transaction can be committed. + SimpleStateMachine4Testing.get(leader).unblockStartTransaction(); + // make sure the the next request succeeds. This will ensure the first + // request completed + writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get(); + } + } + + @Test public void testAsyncRequestSemaphore() throws Exception { LOG.info("Running testAsyncRequestSemaphore"); final CLUSTER cluster = newCluster(NUM_SERVERS); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java index d1cb7e0..d8856e7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java @@ -60,7 +60,6 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> final RaftProperties p = getProperties(); p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); - RaftClientConfigKeys.Rpc.setRetryInterval(p, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java ---------------------------------------------------------------------- diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java index d481003..ff947d4 100644 --- a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java +++ b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java @@ -37,6 +37,6 @@ public class TestRetryPolicy { boolean shouldRetry = retryPolicy.shouldRetry(1); Assert.assertTrue(shouldRetry); Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration()); - Assert.assertFalse(retryPolicy.shouldRetry(2)); + Assert.assertFalse(retryPolicy.shouldRetry(3)); } } \ No newline at end of file
