Repository: incubator-ratis
Updated Branches:
  refs/heads/master a0f19ceb2 -> 3fde3d2ac


RATIS-386. Raft Client Async API's should honor Retry Policy.  Contributed by 
Shashikant Banerjee


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3fde3d2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3fde3d2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3fde3d2a

Branch: refs/heads/master
Commit: 3fde3d2ac7c48c93d2dd1b18c560cee1d918046e
Parents: a0f19ce
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Fri Nov 16 13:11:18 2018 -0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Fri Nov 16 13:11:18 2018 -0800

----------------------------------------------------------------------
 .../ratis/client/RaftClientConfigKeys.java      | 10 ---
 .../ratis/client/impl/RaftClientImpl.java       | 37 ++++++++---
 .../apache/ratis/protocol/RaftClientReply.java  |  9 ++-
 .../protocol/RaftRetryFailureException.java     | 28 ++++++++
 .../org/apache/ratis/retry/RetryPolicies.java   | 68 +++++++++++++++-----
 .../org/apache/ratis/retry/RetryPolicy.java     |  4 +-
 .../org/apache/ratis/grpc/GrpcConfigKeys.java   |  4 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  | 28 ++++++--
 .../java/org/apache/ratis/RaftAsyncTests.java   | 30 +++++++++
 .../org/apache/ratis/WatchRequestTests.java     |  1 -
 .../java/org/apache/ratis/TestRetryPolicy.java  |  2 +-
 11 files changed, 176 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 10fc69d..b07dade 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -39,16 +39,6 @@ public interface RaftClientConfigKeys {
   interface Rpc {
     String PREFIX = RaftClientConfigKeys.PREFIX + ".rpc";
 
-    String RETRY_INTERVAL_KEY = PREFIX + ".retryInterval";
-    TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, 
TimeUnit.MILLISECONDS);
-    static TimeDuration retryInterval(RaftProperties properties) {
-      return 
getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
-          RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog());
-    }
-    static void setRetryInterval(RaftProperties properties, TimeDuration 
timeoutDuration) {
-      setTimeDuration(properties::setTimeDuration, RETRY_INTERVAL_KEY, 
timeoutDuration);
-    }
-
     String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
     TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, 
TimeUnit.MILLISECONDS);
     static TimeDuration requestTimeout(RaftProperties properties) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index c49a360..36508b9 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -52,6 +52,7 @@ final class RaftClientImpl implements RaftClient {
     private final long seqNum;
     private final LongFunction<RaftClientRequest> requestConstructor;
     private final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
+    private volatile int attemptCount;
 
     PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> 
requestConstructor) {
       this.seqNum = seqNum;
@@ -59,6 +60,7 @@ final class RaftClientImpl implements RaftClient {
     }
 
     RaftClientRequest newRequest() {
+      attemptCount++;
       return requestConstructor.apply(seqNum);
     }
 
@@ -81,6 +83,10 @@ final class RaftClientImpl implements RaftClient {
       return replyFuture;
     }
 
+    public int getAttemptCount() {
+      return attemptCount;
+    }
+
     @Override
     public String toString() {
       return "[seq=" + getSeqNum() + "]";
@@ -91,7 +97,6 @@ final class RaftClientImpl implements RaftClient {
   private final RaftClientRpc clientRpc;
   private final Collection<RaftPeer> peers;
   private final RaftGroupId groupId;
-  private final TimeDuration retryInterval;
   private final RetryPolicy retryPolicy;
 
   private volatile RaftPeerId leaderId;
@@ -110,7 +115,6 @@ final class RaftClientImpl implements RaftClient {
     this.groupId = group.getGroupId();
     this.leaderId = leaderId != null? leaderId
         : !peers.isEmpty()? peers.iterator().next().getId(): null;
-    this.retryInterval = RaftClientConfigKeys.Rpc.retryInterval(properties);
     Preconditions.assertTrue(retryPolicy != null, "retry policy can't be 
null");
     this.retryPolicy = retryPolicy;
 
@@ -268,10 +272,10 @@ final class RaftClientImpl implements RaftClient {
       PendingAsyncRequest pending) {
     final RaftClientRequest request = pending.newRequest();
     final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
-    return sendRequestAsync(request).thenCompose(reply -> {
+    return sendRequestAsync(request, 
pending.getAttemptCount()).thenCompose(reply -> {
       if (reply == null) {
-        LOG.debug("schedule a retry in {} for {}", retryInterval, request);
-        scheduler.onTimeout(retryInterval,
+        LOG.debug("schedule attempt #{} with policy {} for {}", 
pending.getAttemptCount(), retryPolicy, request);
+        scheduler.onTimeout(retryPolicy.getSleepTime(),
             () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetryAsync),
             LOG, () -> "Failed to retry " + request);
       } else {
@@ -284,13 +288,13 @@ final class RaftClientImpl implements RaftClient {
   private RaftClientReply sendRequestWithRetry(
       Supplier<RaftClientRequest> supplier)
       throws InterruptedIOException, StateMachineException, 
GroupMismatchException {
-    for(int retryCount = 0;; retryCount++) {
+    for(int attemptCount = 0;; attemptCount++) {
       final RaftClientRequest request = supplier.get();
       final RaftClientReply reply = sendRequest(request);
       if (reply != null) {
         return reply;
       }
-      if (!retryPolicy.shouldRetry(retryCount)) {
+      if (!retryPolicy.shouldRetry(attemptCount)) {
         return null;
       }
       try {
@@ -302,7 +306,7 @@ final class RaftClientImpl implements RaftClient {
   }
 
   private CompletableFuture<RaftClientReply> sendRequestAsync(
-      RaftClientRequest request) {
+      RaftClientRequest request, int attemptCount) {
     LOG.debug("{}: send* {}", clientId, request);
     return clientRpc.sendRequestAsync(request).thenApply(reply -> {
       LOG.debug("{}: receive* {}", clientId, reply);
@@ -310,6 +314,8 @@ final class RaftClientImpl implements RaftClient {
       if (reply != null) {
         getSlidingWindow(request).receiveReply(
             request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
+      } else if (!retryPolicy.shouldRetry(attemptCount)) {
+        return handleAsyncRetry(request, attemptCount);
       }
       return reply;
     }).exceptionally(e -> {
@@ -322,6 +328,12 @@ final class RaftClientImpl implements RaftClient {
       if (e instanceof GroupMismatchException) {
         throw new CompletionException(e);
       } else if (e instanceof IOException) {
+        // once the retryLimit is hit, just remove the request from the
+        // sliding window and throw an exception. The exception thrown here 
will
+        // make sure its not retried any more with sendRequestWithRetryAsync 
call.
+        if (!retryPolicy.shouldRetry(attemptCount)) {
+          return handleAsyncRetry(request, attemptCount);
+        }
         handleIOException(request, (IOException)e, null);
       } else {
         throw new CompletionException(e);
@@ -330,6 +342,15 @@ final class RaftClientImpl implements RaftClient {
     });
   }
 
+  private RaftClientReply handleAsyncRetry(RaftClientRequest request, int 
attemptCount) {
+    RaftClientReply reply = new RaftClientReply(request,
+        new RaftRetryFailureException(
+            "Failed " + request + " for " + attemptCount + " attempts with " + 
retryPolicy), null);
+    getSlidingWindow(request).receiveReply(
+        request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
+    return reply;
+  }
+
   private RaftClientReply sendRequest(RaftClientRequest request)
       throws StateMachineException, GroupMismatchException {
     LOG.debug("{}: send {}", clientId, request);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 0ec9f75..7b3979b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -67,8 +67,8 @@ public class RaftClientReply extends RaftClientMessage {
       Preconditions.assertTrue(!success,
           () -> "Inconsistent parameters: success && exception != null: " + 
this);
       Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
-          NotLeaderException.class, NotReplicatedException.class, 
StateMachineException.class),
-          () -> "Unexpected exception class: " + this);
+          NotLeaderException.class, NotReplicatedException.class, 
StateMachineException.class,
+          RaftRetryFailureException.class), () -> "Unexpected exception class: 
" + this);
     }
   }
 
@@ -143,4 +143,9 @@ public class RaftClientReply extends RaftClientMessage {
   public StateMachineException getStateMachineException() {
     return JavaUtils.cast(exception, StateMachineException.class);
   }
+
+  /** If this reply has {@link RaftRetryFailureException}, return it; 
otherwise return null. */
+  public RaftRetryFailureException getRetryFailureException() {
+    return JavaUtils.cast(exception, RaftRetryFailureException.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
new file mode 100644
index 0000000..690e96b
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRetryFailureException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * Retry failure as per the retryPolicy defined.
+ */
+public class RaftRetryFailureException extends RaftException {
+
+  public RaftRetryFailureException(String msg) {
+    super(msg);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index 1f2df97..b405f81 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -35,11 +35,18 @@ public interface RetryPolicies {
   }
 
   /**
+   * Keep retrying forever with fixed sleep.
+   */
+  static RetryPolicy retryForeverWithSleep(TimeDuration sleepTime) {
+    return new RetryForeverWithSleep(sleepTime);
+  }
+
+  /**
    * Keep trying a limited number of times, waiting a fixed time between 
attempts,
    * and then fail by re-throwing the exception.
    */
-  static RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxRetries, 
TimeDuration sleepTime) {
-    return new RetryLimited(maxRetries, sleepTime);
+  static RetryPolicy retryUpToMaximumCountWithFixedSleep(int maxAttempts, 
TimeDuration sleepTime) {
+    return new RetryLimited(maxAttempts, sleepTime);
   }
 
   class Constants {
@@ -51,7 +58,7 @@ public interface RetryPolicies {
     private RetryForeverNoSleep() {}
 
     @Override
-    public boolean shouldRetry(int retryCount) {
+    public boolean shouldRetry(int attemptCount) {
       return true;
     }
 
@@ -65,7 +72,7 @@ public interface RetryPolicies {
     private NoRetry() {}
 
     @Override
-    public boolean shouldRetry(int retryCount) {
+    public boolean shouldRetry(int attemptCount) {
       return false;
     }
 
@@ -75,8 +82,39 @@ public interface RetryPolicies {
     }
   }
 
+  class RetryForeverWithSleep implements RetryPolicy {
+    private final TimeDuration sleepTime;
+
+    private String myString;
+
+    RetryForeverWithSleep(TimeDuration sleepTime) {
+      if (sleepTime.isNegative()) {
+        throw new IllegalArgumentException(
+            "sleepTime = " + sleepTime.getDuration() + " < 0");
+      }
+      this.sleepTime = sleepTime;
+    }
+
+    @Override
+    public TimeDuration getSleepTime() {
+      return sleepTime;
+    }
+
+    @Override
+    public boolean shouldRetry(int attemptCount) {
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      if (myString == null) {
+        myString = getClass().getSimpleName() + "(sleepTime = " + sleepTime + 
")";
+      }
+      return myString;
+    }
+  }
   /**
-   * Retry up to maxRetries.
+   * Retry up to maxAttempts.
    * The actual sleep time of the n-th retry is f(n, sleepTime),
    * where f is a function provided by the subclass implementation.
    *
@@ -84,21 +122,21 @@ public interface RetryPolicies {
    * otherwise, the subclass must override hashCode(), equals(..) and 
toString().
    */
   class RetryLimited implements RetryPolicy {
-    private final int maxRetries;
+    private final int maxAttempts;
     private final TimeDuration sleepTime;
 
     private String myString;
 
-    RetryLimited(int maxRetries, TimeDuration sleepTime) {
-      if (maxRetries < 0) {
-        throw new IllegalArgumentException("maxRetries = " + maxRetries+" < 
0");
+    RetryLimited(int maxAttempts, TimeDuration sleepTime) {
+      if (maxAttempts < 0) {
+        throw new IllegalArgumentException("maxAttempts = " + maxAttempts+" < 
0");
       }
       if (sleepTime.isNegative()) {
         throw new IllegalArgumentException(
             "sleepTime = " + sleepTime.getDuration() + " < 0");
       }
 
-      this.maxRetries = maxRetries;
+      this.maxAttempts = maxAttempts;
       this.sleepTime = sleepTime;
     }
 
@@ -107,19 +145,19 @@ public interface RetryPolicies {
       return sleepTime;
     }
 
-    public int getMaxRetries() {
-      return maxRetries;
+    public int getMaxAttempts() {
+      return maxAttempts;
     }
 
     @Override
-    public boolean shouldRetry(int retryCount) {
-      return retryCount < maxRetries;
+    public boolean shouldRetry(int attemptCount) {
+      return attemptCount <= maxAttempts;
     }
 
     @Override
     public String toString() {
       if (myString == null) {
-        myString = getClass().getSimpleName() + "(maxRetries=" + maxRetries
+        myString = getClass().getSimpleName() + "(maxAttempts=" + maxAttempts
             + ", sleepTime=" + sleepTime + ")";
       }
       return myString;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index 3de972a..771e524 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -30,10 +30,10 @@ public interface RetryPolicy {
    * Determines whether it is supposed to retry the connection if the operation
    * fails for some reason.
    *
-   * @param retryCount The number of times retried so far
+   * @param attemptCount The number of times attempted so far
    * @return true if it has to make another attempt, otherwise, false
    */
-  boolean shouldRetry(int retryCount);
+  boolean shouldRetry(int attemptCount);
 
   /**
    * Returns the time duration for sleep in between the retries.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index ab9ea8c..5147d8c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -17,13 +17,13 @@
  */
 package org.apache.ratis.grpc;
 
-import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.conf.ConfUtils.*;
@@ -89,7 +89,7 @@ public interface GrpcConfigKeys {
     }
 
     String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval";
-    TimeDuration RETRY_INTERVAL_DEFAULT = 
RaftClientConfigKeys.Rpc.RETRY_INTERVAL_DEFAULT;
+    TimeDuration RETRY_INTERVAL_DEFAULT = TimeDuration.valueOf(300, 
TimeUnit.MILLISECONDS);
     static TimeDuration retryInterval(RaftProperties properties) {
       return 
getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
           RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT, getDefaultLog());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 0e352f4..0cf9449 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -21,6 +21,8 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
@@ -61,6 +63,8 @@ public abstract class MiniRaftCluster implements Closeable {
   public static final String CLASS_NAME = 
MiniRaftCluster.class.getSimpleName();
   public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + 
".statemachine.class";
   private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = 
gid -> new BaseStateMachine();
+  private static final TimeDuration RETRY_INTERVAL_DEFAULT =
+      TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
 
   public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
     public interface Get<CLUSTER extends MiniRaftCluster> {
@@ -155,6 +159,7 @@ public abstract class MiniRaftCluster implements Closeable {
   protected final Map<RaftPeerId, RaftPeer> peers = new ConcurrentHashMap<>();
 
   private volatile StateMachine.Registry stateMachineRegistry = null;
+  private volatile TimeDuration retryInterval;
 
   private final Timer timer;
 
@@ -532,6 +537,10 @@ public abstract class MiniRaftCluster implements Closeable 
{
     return getServerStream(groupId).filter(RaftServerImpl::isAlive);
   }
 
+  private RetryPolicy getDefaultRetryPolicy() {
+    return RetryPolicies.retryForeverWithSleep(RETRY_INTERVAL_DEFAULT);
+  }
+
   public RaftServerProxy getServer(RaftPeerId id) {
     return servers.get(id);
   }
@@ -572,18 +581,29 @@ public abstract class MiniRaftCluster implements 
Closeable {
     return createClient(leaderId, group);
   }
 
+  public RaftClient createClient(RaftPeerId leaderId, RetryPolicy retryPolicy) 
{
+    return createClient(leaderId, group, null, retryPolicy);
+  }
+
   public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
-    return createClient(leaderId, group, null);
+    return createClient(leaderId, group, null, getDefaultRetryPolicy());
+  }
+
+  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
+      ClientId clientId) {
+    return createClient(leaderId, group, clientId, getDefaultRetryPolicy());
   }
 
-  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, 
ClientId clientId) {
-    return RaftClient.newBuilder()
+  public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
+      ClientId clientId, RetryPolicy retryPolicy) {
+    RaftClient.Builder builder = RaftClient.newBuilder()
         .setClientId(clientId)
         .setRaftGroup(group)
         .setLeaderId(leaderId)
         .setProperties(properties)
         .setParameters(parameters)
-        .build();
+        .setRetryPolicy(retryPolicy);
+    return builder.build();
   }
 
   public RaftClientRequest newRaftClientRequest(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index c14515c..46630b4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -23,8 +23,11 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -90,6 +93,33 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   }
 
   @Test
+  public void testRequestAsyncWithRetryPolicy() throws Exception {
+    LOG.info("Running testWatchRequestsWithRetryPolicy");
+    try(final CLUSTER cluster = newCluster(NUM_SERVERS)) {
+     int maxRetries = 3;
+      final RetryPolicy retryPolicy = RetryPolicies
+          .retryUpToMaximumCountWithFixedSleep(maxRetries, 
TimeDuration.valueOf(1, TimeUnit.SECONDS));
+      cluster.start();
+      final RaftClient writeClient =
+          cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId(), 
retryPolicy);
+      // blockStartTransaction of the leader so that no transaction can be 
committed MAJORITY
+      final RaftServerImpl leader = cluster.getLeader();
+      LOG.info("block leader {}", leader.getId());
+      SimpleStateMachine4Testing.get(leader).blockStartTransaction();
+      RaftClientReply reply =
+          writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get();
+      RaftRetryFailureException rfe = reply.getRetryFailureException();
+      Assert.assertTrue(rfe != null);
+      Assert.assertTrue(rfe.getMessage().contains(retryPolicy.toString()));
+      // unblock leader so that the next transaction can be committed.
+      SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
+      // make sure the the next request succeeds. This will ensure the first
+      // request completed
+      writeClient.sendAsync(RaftTestUtil.SimpleMessage.create(1)[0]).get();
+      }
+    }
+
+  @Test
   public void testAsyncRequestSemaphore() throws Exception {
     LOG.info("Running testAsyncRequestSemaphore");
     final CLUSTER cluster = newCluster(NUM_SERVERS);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index d1cb7e0..d8856e7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -60,7 +60,6 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     final RaftProperties p = getProperties();
     p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
-    RaftClientConfigKeys.Rpc.setRetryInterval(p, TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3fde3d2a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java 
b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
index d481003..ff947d4 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
@@ -37,6 +37,6 @@ public class TestRetryPolicy {
      boolean shouldRetry = retryPolicy.shouldRetry(1);
     Assert.assertTrue(shouldRetry);
     Assert.assertTrue(1000 == retryPolicy.getSleepTime().getDuration());
-    Assert.assertFalse(retryPolicy.shouldRetry(2));
+    Assert.assertFalse(retryPolicy.shouldRetry(3));
   }
 }
\ No newline at end of file

Reply via email to