This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new e2cbcc1  RATIS-580. Refactor retryPolicy for RaftClient to customize 
wait time based on exception. Contributed by Tsz Wo Nicholas Sze.
e2cbcc1 is described below

commit e2cbcc1b5538f50a2ec80facefe08f307fd05683
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Nov 6 15:11:21 2019 +0530

    RATIS-580. Refactor retryPolicy for RaftClient to customize wait time based 
on exception. Contributed by Tsz Wo Nicholas Sze.
---
 .../org/apache/ratis/client/ClientRetryEvent.java  |  56 ++++++++++
 .../org/apache/ratis/client/impl/OrderedAsync.java |  20 ++--
 .../apache/ratis/client/impl/RaftClientImpl.java   |  18 ++--
 .../apache/ratis/client/impl/UnorderedAsync.java   |  13 ++-
 .../retry/RequestTypeDependentRetryPolicy.java     |  87 ++++++++++++++++
 .../java/org/apache/ratis/retry/RetryPolicies.java | 113 ++++-----------------
 .../java/org/apache/ratis/retry/RetryPolicy.java   |  48 ++++++---
 .../java/org/apache/ratis/util/TimeDuration.java   |   9 ++
 .../ratis/util/function/CheckedBiFunction.java     |  28 +++++
 .../test/java/org/apache/ratis/RaftAsyncTests.java |  56 +++++-----
 .../java/org/apache/ratis/WatchRequestTests.java   |   9 +-
 .../java/org/apache/ratis/TestRetryPolicy.java     |  44 +++++---
 12 files changed, 328 insertions(+), 173 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java 
b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
new file mode 100644
index 0000000..90be9fb
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client;
+
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.retry.RetryPolicy;
+
+/** An {@link RetryPolicy.Event} specific to client request failure. */
+public class ClientRetryEvent implements RetryPolicy.Event {
+  private final int attemptCount;
+  private final RaftClientRequest request;
+  private final Throwable cause;
+
+  public ClientRetryEvent(int attemptCount, RaftClientRequest request, 
Throwable cause) {
+    this.attemptCount = attemptCount;
+    this.request = request;
+    this.cause = cause;
+  }
+
+  public ClientRetryEvent(int attemptCount, RaftClientRequest request) {
+    this(attemptCount, request, null);
+  }
+
+  @Override
+  public int getAttemptCount() {
+    return attemptCount;
+  }
+
+  public RaftClientRequest getRequest() {
+    return request;
+  }
+
+  public Throwable getCause() {
+    return cause;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ":attempt=" + attemptCount + 
",request=" + request + ",cause=" + cause;
+  }
+}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 79ee050..01e6771 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.client.ClientRetryEvent;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.conf.RaftProperties;
@@ -148,8 +149,8 @@ public final class OrderedAsync {
     
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
   }
 
-  private void handleAsyncRetryFailure(RaftClientRequest request, int 
attemptCount, Throwable throwable) {
-    failAllAsyncRequests(request, client.noMoreRetries(request, attemptCount, 
throwable));
+  private void handleAsyncRetryFailure(ClientRetryEvent event) {
+    failAllAsyncRequests(event.getRequest(), client.noMoreRetries(event));
   }
 
   CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message 
message, RaftPeerId server) {
@@ -209,7 +210,8 @@ public final class OrderedAsync {
 
   private void scheduleWithTimeout(PendingOrderedRequest pending, 
RaftClientRequest request, RetryPolicy retryPolicy) {
     final int attempt = pending.getAttemptCount();
-    final TimeDuration sleepTime = retryPolicy.getSleepTime(attempt, request);
+    final ClientRetryEvent event = new ClientRetryEvent(attempt, request);
+    final TimeDuration sleepTime = 
retryPolicy.handleAttemptFailure(event).getSleepTime();
     LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", 
attempt, sleepTime, retryPolicy, request);
     scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
   }
@@ -239,8 +241,11 @@ public final class OrderedAsync {
       if (reply != null) {
         getSlidingWindow(request).receiveReply(
             request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetry);
-      } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
-        handleAsyncRetryFailure(request, attemptCount, replyException);
+      } else {
+        final ClientRetryEvent event = new ClientRetryEvent(attemptCount, 
request, replyException);
+        if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
+          handleAsyncRetryFailure(event);
+        }
       }
       return reply;
     }).exceptionally(e -> {
@@ -251,8 +256,9 @@ public final class OrderedAsync {
       }
       e = JavaUtils.unwrapCompletionException(e);
       if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
-        if (!retryPolicy.shouldRetry(attemptCount, request)) {
-          handleAsyncRetryFailure(request, attemptCount, e);
+        final ClientRetryEvent event = new ClientRetryEvent(attemptCount, 
request, e);
+        if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
+          handleAsyncRetryFailure(event);
         } else {
           if (e instanceof NotLeaderException) {
             NotLeaderException nle = (NotLeaderException)e;
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index e1df86b..1a24cd4 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.client.ClientRetryEvent;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
@@ -252,23 +253,28 @@ final class RaftClientImpl implements RaftClient {
       } catch (IOException e) {
         ioe = e;
       }
-      if (!retryPolicy.shouldRetry(attemptCount, request)) {
-        throw (IOException)noMoreRetries(request, attemptCount, ioe);
+
+      final ClientRetryEvent event = new ClientRetryEvent(attemptCount, 
request, ioe);
+      final RetryPolicy.Action action = 
retryPolicy.handleAttemptFailure(event);
+      if (!action.shouldRetry()) {
+        throw (IOException)noMoreRetries(event);
       }
 
       try {
-        retryPolicy.getSleepTime(attemptCount, request).sleep();
+        action.getSleepTime().sleep();
       } catch (InterruptedException e) {
         throw new InterruptedIOException("retry policy=" + retryPolicy);
       }
     }
   }
 
-  Throwable noMoreRetries(RaftClientRequest request, int attemptCount, 
Throwable throwable) {
+  Throwable noMoreRetries(ClientRetryEvent event) {
+    final int attemptCount = event.getAttemptCount();
+    final Throwable throwable = event.getCause();
     if (attemptCount == 1 && throwable != null) {
       return throwable;
     }
-    return new RaftRetryFailureException(request, attemptCount, retryPolicy, 
throwable);
+    return new RaftRetryFailureException(event.getRequest(), attemptCount, 
retryPolicy, throwable);
   }
 
   private RaftClientReply sendRequest(RaftClientRequest request) throws 
IOException {
@@ -369,7 +375,7 @@ final class RaftClientImpl implements RaftClient {
             clientId, oldLeader, newLeader, ioe.getClass().getName());
         this.leaderId = newLeader;
       }
-      clientRpc.handleException(oldLeader, ioe, reconnect);
+      clientRpc.handleException(oldLeader, ioe, true);
     }
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 8dee190..d7c7e11 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.client.ClientRetryEvent;
 import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.GroupMismatchException;
@@ -82,9 +83,11 @@ public interface UnorderedAsync {
           return;
         }
         RetryPolicy retryPolicy = client.getRetryPolicy();
-        if (!retryPolicy.shouldRetry(attemptCount, request)) {
-          f.completeExceptionally(
-              client.noMoreRetries(request, attemptCount, replyException != 
null? replyException: e));
+        final ClientRetryEvent event = new ClientRetryEvent(attemptCount, 
request,
+            replyException != null? replyException: e);
+        final RetryPolicy.Action action = 
retryPolicy.handleAttemptFailure(event);
+        if (!action.shouldRetry()) {
+          f.completeExceptionally(client.noMoreRetries(event));
           return;
         }
 
@@ -116,10 +119,10 @@ public interface UnorderedAsync {
         }
 
         LOG.debug("schedule retry for attempt #{}, policy={}, request={}", 
attemptCount, retryPolicy, request);
-        client.getScheduler().onTimeout(retryPolicy.getSleepTime(attemptCount, 
request),
+        client.getScheduler().onTimeout(action.getSleepTime(),
             () -> sendRequestWithRetry(pending, client), LOG, () -> clientId + 
": Failed~ to retry " + request);
       } catch (Throwable t) {
-        LOG.error(clientId + ": XXX Failed " + request, t);
+        LOG.error(clientId + ": Failed " + request, t);
         f.completeExceptionally(t);
       }
     });
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
new file mode 100644
index 0000000..c4d7523
--- /dev/null
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.retry;
+
+import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.Preconditions;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link org.apache.ratis.protocol.RaftClientRequest.Type} dependent {@link 
RetryPolicy}
+ * such that each type can be set to use an individual policy.
+ * When the policy is not set for a particular type,
+ * the {@link RetryPolicies#retryForeverNoSleep()} policy is used as the 
default.
+ */
+public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
+  public static class Builder {
+    private final EnumMap<RaftProtos.RaftClientRequestProto.TypeCase, 
RetryPolicy> map
+        = new EnumMap<>(RaftProtos.RaftClientRequestProto.TypeCase.class);
+
+    /** Set the given policy for the given type. */
+    public Builder set(RaftProtos.RaftClientRequestProto.TypeCase type, 
RetryPolicy policy) {
+      final RetryPolicy previous = map.put(type, policy);
+      Preconditions.assertNull(previous, () -> "The type " + type + " is 
already set to " + previous);
+      return this;
+    }
+
+    public RequestTypeDependentRetryPolicy build() {
+      return new RequestTypeDependentRetryPolicy(map);
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  private final Map<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy> 
map;
+  private final Supplier<String> myString;
+
+  private 
RequestTypeDependentRetryPolicy(EnumMap<RaftProtos.RaftClientRequestProto.TypeCase,
 RetryPolicy> map) {
+    this.map = Collections.unmodifiableMap(map);
+    this.myString = () -> {
+      final StringBuilder b = new 
StringBuilder(getClass().getSimpleName()).append("{");
+      map.forEach((key, value) -> 
b.append(key).append("->").append(value).append(", "));
+      b.setLength(b.length() - 2);
+      return b.append("}").toString();
+    };
+  }
+
+  @Override
+  public Action handleAttemptFailure(Event event) {
+    if (!(event instanceof ClientRetryEvent)) {
+      return RetryPolicies.retryForeverNoSleep().handleAttemptFailure(event);
+    }
+    final ClientRetryEvent clientEvent = (ClientRetryEvent) event;
+    return 
Optional.ofNullable(map.get(clientEvent.getRequest().getType().getTypeCase()))
+        .orElse(RetryPolicies.retryForeverNoSleep())
+        .handleAttemptFailure(event);
+  }
+
+  @Override
+  public String toString() {
+    return myString.get();
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index 7f59fc8..4518c11 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -17,15 +17,11 @@
  */
 package org.apache.ratis.retry;
 
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.function.Supplier;
 
 /**
  * A collection of {@link RetryPolicy} implementations
@@ -60,8 +56,8 @@ public interface RetryPolicies {
     private RetryForeverNoSleep() {}
 
     @Override
-    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
-      return true;
+    public Action handleAttemptFailure(Event event) {
+      return RetryPolicy.RETRY_WITHOUT_SLEEP_ACTION;
     }
 
     @Override
@@ -74,8 +70,8 @@ public interface RetryPolicies {
     private NoRetry() {}
 
     @Override
-    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
-      return false;
+    public Action handleAttemptFailure(Event event) {
+      return RetryPolicy.NO_RETRY_ACTION;
     }
 
     @Override
@@ -89,18 +85,13 @@ public interface RetryPolicies {
     private final TimeDuration sleepTime;
 
     private RetryForeverWithSleep(TimeDuration sleepTime) {
-      Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + 
sleepTime.getDuration() + " < 0");
+      Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + 
sleepTime + " < 0");
       this.sleepTime = sleepTime;
     }
 
     @Override
-    public TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
-      return sleepTime;
-    }
-
-    @Override
-    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
-      return true;
+    public Action handleAttemptFailure(Event event) {
+      return () -> sleepTime;
     }
 
     @Override
@@ -110,28 +101,20 @@ public interface RetryPolicies {
   }
 
   /** For any requests, keep retrying a limited number of attempts with a 
fixed sleep time between attempts. */
-  class RetryLimited implements RetryPolicy {
+  class RetryLimited extends RetryForeverWithSleep  {
     private final int maxAttempts;
-    private final TimeDuration sleepTime;
-
-    private String myString;
+    private final Supplier<String> myString;
 
     private RetryLimited(int maxAttempts, TimeDuration sleepTime) {
+      super(sleepTime);
+
       if (maxAttempts < 0) {
         throw new IllegalArgumentException("maxAttempts = " + maxAttempts+" < 
0");
       }
-      if (sleepTime.isNegative()) {
-        throw new IllegalArgumentException(
-            "sleepTime = " + sleepTime.getDuration() + " < 0");
-      }
 
       this.maxAttempts = maxAttempts;
-      this.sleepTime = sleepTime;
-    }
-
-    @Override
-    public TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
-      return shouldRetry(attemptCount, request) ? sleepTime: ZERO_MILLIS;
+      this.myString = JavaUtils.memoize(() -> getClass().getSimpleName()
+          + "(maxAttempts=" + maxAttempts + ", sleepTime=" + sleepTime + ")");
     }
 
     public int getMaxAttempts() {
@@ -139,73 +122,13 @@ public interface RetryPolicies {
     }
 
     @Override
-    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
-      return attemptCount < maxAttempts;
-    }
-
-    @Override
-    public String toString() {
-      if (myString == null) {
-        myString = getClass().getSimpleName() + "(maxAttempts=" + maxAttempts
-            + ", sleepTime=" + sleepTime + ")";
-      }
-      return myString;
-    }
-  }
-
-  /**
-   * A {@link RaftClientRequest.Type} dependent {@link RetryPolicy}
-   * such that each type can be set to use an individual policy.
-   * When the policy is not set for a particular type,
-   * the {@link #retryForeverNoSleep()} policy is used as the default.
-   */
-  class RequestTypeDependentRetry implements RetryPolicy {
-    public static class Builder {
-      private final EnumMap<RaftClientRequestProto.TypeCase, RetryPolicy> map
-          = new EnumMap<>(RaftClientRequestProto.TypeCase.class);
-
-      /** Set the given policy for the given type. */
-      public Builder set(RaftClientRequestProto.TypeCase type, RetryPolicy 
policy) {
-        final RetryPolicy previous = map.put(type, policy);
-        Preconditions.assertNull(previous, () -> "The type " + type + " is 
already set to " + previous);
-        return this;
-      }
-
-      public RequestTypeDependentRetry build() {
-        return new RequestTypeDependentRetry(map);
-      }
-    }
-
-    public static Builder newBuilder() {
-      return new Builder();
-    }
-
-    private final Map<RaftClientRequestProto.TypeCase, RetryPolicy> map;
-
-    private RequestTypeDependentRetry(EnumMap<RaftClientRequestProto.TypeCase, 
RetryPolicy> map) {
-      this.map = Collections.unmodifiableMap(map);
-    }
-
-    @Override
-    public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
-      return Optional.ofNullable(map.get(request.getType().getTypeCase()))
-          .orElse(retryForeverNoSleep())
-          .shouldRetry(attemptCount, request);
-    }
-
-    @Override
-    public TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
-      return Optional.ofNullable(map.get(request.getType().getTypeCase()))
-          .orElse(retryForeverNoSleep())
-          .getSleepTime(attemptCount, request);
+    public Action handleAttemptFailure(Event event) {
+      return event.getAttemptCount() < maxAttempts? 
super.handleAttemptFailure(event): NO_RETRY_ACTION;
     }
 
     @Override
     public String toString() {
-      final StringBuilder b = new 
StringBuilder(getClass().getSimpleName()).append("{");
-      map.forEach((key, value) -> 
b.append(key).append("->").append(value).append(", "));
-      b.setLength(b.length() - 2);
-      return b.append("}").toString();
+      return myString.get();
     }
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index 162f760..4137982 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -17,31 +17,47 @@
  */
 package org.apache.ratis.retry;
 
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.util.TimeDuration;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * Policy abstract for retrying.
  */
 public interface RetryPolicy {
-  TimeDuration ZERO_MILLIS = TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
+  Action NO_RETRY_ACTION = new Action() {
+    @Override
+    public boolean shouldRetry() {
+      return false;
+    }
+    @Override
+    public TimeDuration getSleepTime() {
+      return TimeDuration.ZERO;
+    }
+  };
+
+  Action RETRY_WITHOUT_SLEEP_ACTION = () -> TimeDuration.ZERO;
+
+  /** The action it should take. */
+  interface Action {
+    /** @return true if it has to make another attempt; otherwise, return 
false. */
+    default boolean shouldRetry() {
+      return true;
+    }
+
+    /** @return the sleep time period before the next attempt. */
+    TimeDuration getSleepTime();
+  }
+
+  /** The event triggered the failure. */
+  interface Event {
+    /** @return the number of attempts tried so far. */
+    int getAttemptCount();
+  }
 
   /**
    * Determines whether it is supposed to retry after the operation has failed.
    *
-   * @param attemptCount The number of times attempted so far.
-   * @param request The failed request.
-   * @return true if it has to make another attempt; otherwise, return false.
+   * @param event The failed event.
+   * @return the action it should take.
    */
-  boolean shouldRetry(int attemptCount, RaftClientRequest request);
-
-  /**
-   * @param attemptCount The number of times attempted so far.
-   * @return the {@link TimeDuration} to sleep in between the retries.
-   */
-  default TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
-    return ZERO_MILLIS;
-  }
+  Action handleAttemptFailure(Event event);
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java 
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 49aea5e..90dbe60 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.util.function.CheckedBiFunction;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -34,6 +36,7 @@ import java.util.function.LongUnaryOperator;
 public final class TimeDuration implements Comparable<TimeDuration> {
   public static final TimeDuration ZERO = valueOf(0, TimeUnit.NANOSECONDS);
   public static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, 
TimeUnit.SECONDS);
+  public static final TimeDuration ONE_MINUTE = TimeDuration.valueOf(1, 
TimeUnit.MINUTES);
 
   /** Abbreviations of {@link TimeUnit}. */
   public enum Abbreviation {
@@ -201,6 +204,12 @@ public final class TimeDuration implements 
Comparable<TimeDuration> {
     return valueOf(operator.applyAsLong(duration), unit);
   }
 
+  /** Apply the given function to the (duration, unit) of this object. */
+  public <OUTPUT, THROWABLE extends Throwable> OUTPUT apply(
+      CheckedBiFunction<Long, TimeUnit, OUTPUT, THROWABLE> function) throws 
THROWABLE {
+    return function.apply(getDuration(), getUnit());
+  }
+
   /** @return Is this {@link TimeDuration} negative? */
   public boolean isNegative() {
     return duration < 0;
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedBiFunction.java
 
b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedBiFunction.java
new file mode 100644
index 0000000..24df140
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedBiFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+/** BiFunction with a throws-clause. */
+@FunctionalInterface
+public interface CheckedBiFunction<LEFT, RIGHT, OUTPUT, THROWABLE extends 
Throwable> {
+  /**
+   * The same as {@link java.util.function.BiFunction#apply(Object, Object)}
+   * except that this method is declared with a throws-clause.
+   */
+  OUTPUT apply(LEFT left, RIGHT right) throws THROWABLE;
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index fc1f100..f44ec32 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -29,7 +29,6 @@ import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.StateMachineException;
@@ -53,14 +52,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
@@ -262,7 +260,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   @Test
   public void testWithLoadAsync() throws Exception {
     runWithNewCluster(NUM_SERVERS,
-        cluster -> RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG));
+        cluster -> RaftBasicTests.testWithLoad(5, 500, true, cluster, LOG));
   }
 
   @Test
@@ -403,30 +401,38 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
oldExpiryTime);
   }
 
-  @Test(timeout = 30000)
+  @Test
   public void testNoRetryWaitOnNotLeaderException() throws Exception {
-    final MiniRaftCluster cluster = newCluster(3);
-    cluster.initServers();
-    cluster.start();
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
false);
+    runWithNewCluster(3, this::runTestNoRetryWaitOnNotLeaderException);
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
true);
+  }
 
+  private void runTestNoRetryWaitOnNotLeaderException(MiniRaftCluster cluster) 
throws Exception {
     final RaftServerImpl leader = waitForLeader(cluster);
-    // Order peers before leaders to try
-    List<RaftPeerId> peers = cluster.getPeers().stream()
-        .filter(p -> !p.getId().equals(leader.getId()))
-        .map(RaftPeer::getId).collect(Collectors.toList());
-
-    Assert.assertNotNull(peers);
-    Assert.assertEquals(2, peers.size());
-    Iterator<RaftPeerId> i = peers.listIterator();
-    RetryPolicy unlimitedRetry =
-        RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, 
TimeDuration.valueOf(60, TimeUnit.SECONDS));
-
-    RaftPeerId first = i.next();
-    RaftPeerId second = i.next();
-    try (final RaftClient client = cluster.createClient(first, 
cluster.getGroup(), unlimitedRetry)) {
-      client.sendAsync(new SimpleMessage("abc")).get();
-    } finally {
-      cluster.shutdown();
+    final List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertNotNull(followers);
+    Assert.assertEquals(2, followers.size());
+    Assert.assertNotSame(leader, followers.get(0));
+    Assert.assertNotSame(leader, followers.get(1));
+
+    // send a message to make sure that the leader is ready
+    try (final RaftClient client = cluster.createClient(leader.getId())) {
+      final CompletableFuture<RaftClientReply> f = client.sendAsync(new 
SimpleMessage("first"));
+      FIVE_SECONDS.apply(f::get);
+    }
+
+    final RetryPolicy r = event -> () -> {
+      final IllegalStateException e = new IllegalStateException("Unexpected 
getSleepTime: " + event);
+      setFirstException(e);
+      throw e;
+    };
+
+    try (final RaftClient client = 
cluster.createClient(followers.get(0).getId(), cluster.getGroup(), r)) {
+      final CompletableFuture<RaftClientReply> f = client.sendAsync(new 
SimpleMessage("abc"));
+      FIVE_SECONDS.apply(f::get);
+    } catch (TimeoutException e) {
+      throw new AssertionError("Failed to get async result", e);
     }
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index a9868ba..3a80c80 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -57,7 +57,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
   }
 
   static final int NUM_SERVERS = 3;
-  static final int GET_TIMEOUT_SECOND = 5;
+  static final int GET_TIMEOUT_SECOND = 10;
 
   @Before
   public void setup() {
@@ -116,11 +116,10 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
   static void runTest(CheckedConsumer<TestParameters, Exception> testCase, 
MiniRaftCluster cluster, Logger LOG)
       throws Exception {
     try(final RaftClient client = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
-      final int[] numMessages = {1, 10, 100};
-      for(int i = 0; i < 5; i++) {
-        final int n = 
numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
+      final int[] numMessages = {1, 10, 20};
+      for(int n : numMessages) {
         final TestParameters p = new TestParameters(n, client, cluster, LOG);
-        LOG.info("{}) {}, {}", i, p, cluster.printServers());
+        LOG.info("{}) {}, {}", n, p, cluster.printServers());
         testCase.accept(p);
       }
     }
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java 
b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
index 4049f6b..967a907 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis;
 
+import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.RequestTypeDependentRetryPolicy;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.ClientId;
@@ -37,21 +39,24 @@ public class TestRetryPolicy extends BaseTest {
     final int n = 4;
     final TimeDuration sleepTime = HUNDRED_MILLIS;
     final RetryPolicy policy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, sleepTime);
-    final RaftClientRequest request = 
newRaftClientRequest(RaftClientRequest.readRequestType());
     for(int i = 1; i < 2*n; i++) {
+      final int attempt = i;
+      final RetryPolicy.Event event = () -> attempt;
+      final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+
       final boolean expected = i < n;
-      Assert.assertEquals(expected, policy.shouldRetry(i, request));
+      Assert.assertEquals(expected, action.shouldRetry());
       if (expected) {
-        Assert.assertEquals(sleepTime, policy.getSleepTime(i, request));
+        Assert.assertEquals(sleepTime, action.getSleepTime());
       } else {
-        Assert.assertEquals(0L, policy.getSleepTime(i, request).getDuration());
+        Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
     }
   }
 
   @Test
   public void testRequestTypeDependentRetry() {
-    final RetryPolicies.RequestTypeDependentRetry.Builder b = 
RetryPolicies.RequestTypeDependentRetry.newBuilder();
+    final RequestTypeDependentRetryPolicy.Builder b = 
RequestTypeDependentRetryPolicy.newBuilder();
     final int n = 4;
     final TimeDuration writeSleep = HUNDRED_MILLIS;
     final RetryPolicies.RetryLimited writePolicy = 
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, writeSleep);
@@ -67,26 +72,37 @@ public class TestRetryPolicy extends BaseTest {
         RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
     for(int i = 1; i < 2*n; i++) {
       { //write
+        final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest);
+        final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+
         final boolean expected = i < n;
-        Assert.assertEquals(expected, policy.shouldRetry(i, writeRequest));
+        Assert.assertEquals(expected, action.shouldRetry());
         if (expected) {
-          Assert.assertEquals(writeSleep, policy.getSleepTime(i, 
writeRequest));
+          Assert.assertEquals(writeSleep, action.getSleepTime());
         } else {
-          Assert.assertEquals(0L, policy.getSleepTime(i, 
writeRequest).getDuration());
+          Assert.assertEquals(0L, action.getSleepTime().getDuration());
         }
       }
 
       { //read and stale read are using default
-        Assert.assertTrue(policy.shouldRetry(i, readRequest));
-        Assert.assertEquals(0L, policy.getSleepTime(i, 
readRequest).getDuration());
+        final ClientRetryEvent event = new ClientRetryEvent(i, readRequest);
+        final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+        Assert.assertTrue(action.shouldRetry());
+        Assert.assertEquals(0L, action.getSleepTime().getDuration());
+      }
 
-        Assert.assertTrue(policy.shouldRetry(i, staleReadRequest));
-        Assert.assertEquals(0L, policy.getSleepTime(i, 
staleReadRequest).getDuration());
+      {
+        final ClientRetryEvent event = new ClientRetryEvent(i, 
staleReadRequest);
+        final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+        Assert.assertTrue(action.shouldRetry());
+        Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
 
       { //watch has no retry
-        Assert.assertFalse(policy.shouldRetry(i, watchRequest));
-        Assert.assertEquals(0L, policy.getSleepTime(i, 
watchRequest).getDuration());
+        final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest);
+        final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+        Assert.assertFalse(action.shouldRetry());
+        Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
     }
 

Reply via email to