This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 971c970  RATIS-574. Client should not use configured sleep interval in 
RetryPolicy when it gets NotLeaderException.  Contributed by Siddharth Wagle
971c970 is described below

commit 971c970a036993f0dedc6721cf52b5f0ca4531e7
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jun 5 11:30:16 2019 +0800

    RATIS-574. Client should not use configured sleep interval in RetryPolicy 
when it gets NotLeaderException.  Contributed by Siddharth Wagle
---
 .../org/apache/ratis/client/impl/OrderedAsync.java | 36 ++++++++++++++++------
 .../apache/ratis/client/impl/UnorderedAsync.java   |  5 ++-
 .../java/org/apache/ratis/retry/RetryPolicies.java |  2 +-
 .../test/java/org/apache/ratis/RaftAsyncTests.java | 30 ++++++++++++++++++
 4 files changed, 62 insertions(+), 11 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 7cb7813..1f22a1e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -24,10 +24,12 @@ import 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftException;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.IOUtils;
@@ -35,7 +37,6 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.SlidingWindow;
-import org.apache.ratis.util.function.FunctionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -165,28 +166,42 @@ class OrderedAsync {
       return;
     }
 
+    RaftClientRequest request = pending.newRequestImpl();
     sendRequest(pending).thenAccept(reply -> {
       if (f.isDone()) {
         return;
       }
       if (reply == null) {
-        final int attempt = pending.getAttemptCount();
-        RaftClientRequest request = pending.getRequest();
-        LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
-        client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt, 
request),
-            () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetry),
-            LOG, () -> "Failed* to retry " + request);
+        scheduleWithTimeout(pending, request, retryPolicy);
       } else {
         f.complete(reply);
       }
-    
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
+    }).exceptionally(e -> {
+      e = JavaUtils.unwrapCompletionException(e);
+      if (e instanceof NotLeaderException) {
+        RetryPolicy noLeaderRetry = ((NotLeaderException) 
e).getSuggestedLeader() != null ?
+            RetryPolicies.retryForeverNoSleep() : retryPolicy;
+        scheduleWithTimeout(pending, request, noLeaderRetry);
+        return null;
+      }
+      f.completeExceptionally(e);
+      return null;
+    });
+  }
+
+  private void scheduleWithTimeout(PendingOrderedRequest pending, 
RaftClientRequest request, RetryPolicy retryPolicy) {
+    final int attempt = pending.getAttemptCount();
+    LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
+    client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt, request),
+        () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetry),
+        LOG, () -> "Failed* to retry " + request);
   }
 
   private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest 
pending) {
     final RetryPolicy retryPolicy = client.getRetryPolicy();
     final CompletableFuture<RaftClientReply> f;
     final RaftClientRequest request;
-    try(AutoCloseableLock readLock = client.readLock()) {
+    try (AutoCloseableLock readLock = client.readLock()) {
       if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
         pending.setFirstRequest();
       }
@@ -219,6 +234,9 @@ class OrderedAsync {
         } else {
           client.handleIOException(request, (IOException) e, null, 
this::resetSlidingWindow);
         }
+        if (e instanceof NotLeaderException) {
+          throw new CompletionException(e);
+        }
         return null;
       }
       failAllAsyncRequests(request, e);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index d248298..b99e950 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
@@ -80,7 +81,7 @@ public interface UnorderedAsync {
           f.complete(reply);
           return;
         }
-        final RetryPolicy retryPolicy = client.getRetryPolicy();
+        RetryPolicy retryPolicy = client.getRetryPolicy();
         if (!retryPolicy.shouldRetry(attemptCount, request)) {
           f.completeExceptionally(
               client.noMoreRetries(request, attemptCount, replyException != 
null? replyException: e));
@@ -98,6 +99,8 @@ public interface UnorderedAsync {
           if (e instanceof IOException) {
             if (e instanceof NotLeaderException) {
               client.handleNotLeaderException(request, (NotLeaderException) e, 
null);
+              retryPolicy = ((NotLeaderException) e).getSuggestedLeader() != 
null ?
+                  RetryPolicies.retryForeverNoSleep() : retryPolicy;
             } else if (e instanceof GroupMismatchException) {
               f.completeExceptionally(e);
               return;
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java 
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index c171c27..7f59fc8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -131,7 +131,7 @@ public interface RetryPolicies {
 
     @Override
     public TimeDuration getSleepTime(int attemptCount, RaftClientRequest 
request) {
-      return shouldRetry(attemptCount, request)? sleepTime: ZERO_MILLIS;
+      return shouldRetry(attemptCount, request) ? sleepTime: ZERO_MILLIS;
     }
 
     public int getMaxAttempts() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 7e10441..8ca2867 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.StateMachineException;
@@ -52,12 +53,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
@@ -390,4 +393,31 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     //reset for the other tests
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
oldExpiryTime);
   }
+
+  @Test(timeout = 30000)
+  public void testNoRetryWaitOnNotLeaderException() throws Exception {
+    final MiniRaftCluster cluster = newCluster(3);
+    cluster.initServers();
+    cluster.start();
+
+    final RaftServerImpl leader = waitForLeader(cluster);
+    // Order peers before leaders to try
+    List<RaftPeerId> peers = cluster.getPeers().stream()
+        .filter(p -> !p.getId().equals(leader.getId()))
+        .map(RaftPeer::getId).collect(Collectors.toList());
+
+    Assert.assertNotNull(peers);
+    Assert.assertEquals(2, peers.size());
+    Iterator<RaftPeerId> i = peers.listIterator();
+    RetryPolicy unlimitedRetry =
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(10, 
TimeDuration.valueOf(60, TimeUnit.SECONDS));
+
+    RaftPeerId first = i.next();
+    RaftPeerId second = i.next();
+    try (final RaftClient client = cluster.createClient(first, 
cluster.getGroup(), unlimitedRetry)) {
+      client.sendAsync(new SimpleMessage("abc")).get();
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Reply via email to