This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 971c970 RATIS-574. Client should not use configured sleep interval in
RetryPolicy when it gets NotLeaderException. Contributed by Siddharth Wagle
971c970 is described below
commit 971c970a036993f0dedc6721cf52b5f0ca4531e7
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jun 5 11:30:16 2019 +0800
RATIS-574. Client should not use configured sleep interval in RetryPolicy
when it gets NotLeaderException. Contributed by Siddharth Wagle
---
.../org/apache/ratis/client/impl/OrderedAsync.java | 36 ++++++++++++++++------
.../apache/ratis/client/impl/UnorderedAsync.java | 5 ++-
.../java/org/apache/ratis/retry/RetryPolicies.java | 2 +-
.../test/java/org/apache/ratis/RaftAsyncTests.java | 30 ++++++++++++++++++
4 files changed, 62 insertions(+), 11 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 7cb7813..1f22a1e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -24,10 +24,12 @@ import
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.GroupMismatchException;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftException;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.IOUtils;
@@ -35,7 +37,6 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.SlidingWindow;
-import org.apache.ratis.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -165,28 +166,42 @@ class OrderedAsync {
return;
}
+ RaftClientRequest request = pending.newRequestImpl();
sendRequest(pending).thenAccept(reply -> {
if (f.isDone()) {
return;
}
if (reply == null) {
- final int attempt = pending.getAttemptCount();
- RaftClientRequest request = pending.getRequest();
- LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
- client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt,
request),
- () -> getSlidingWindow(request).retry(pending,
this::sendRequestWithRetry),
- LOG, () -> "Failed* to retry " + request);
+ scheduleWithTimeout(pending, request, retryPolicy);
} else {
f.complete(reply);
}
-
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
+ }).exceptionally(e -> {
+ e = JavaUtils.unwrapCompletionException(e);
+ if (e instanceof NotLeaderException) {
+ RetryPolicy noLeaderRetry = ((NotLeaderException)
e).getSuggestedLeader() != null ?
+ RetryPolicies.retryForeverNoSleep() : retryPolicy;
+ scheduleWithTimeout(pending, request, noLeaderRetry);
+ return null;
+ }
+ f.completeExceptionally(e);
+ return null;
+ });
+ }
+
+ private void scheduleWithTimeout(PendingOrderedRequest pending,
RaftClientRequest request, RetryPolicy retryPolicy) {
+ final int attempt = pending.getAttemptCount();
+ LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
+ client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt, request),
+ () -> getSlidingWindow(request).retry(pending,
this::sendRequestWithRetry),
+ LOG, () -> "Failed* to retry " + request);
}
private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest
pending) {
final RetryPolicy retryPolicy = client.getRetryPolicy();
final CompletableFuture<RaftClientReply> f;
final RaftClientRequest request;
- try(AutoCloseableLock readLock = client.readLock()) {
+ try (AutoCloseableLock readLock = client.readLock()) {
if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
pending.setFirstRequest();
}
@@ -219,6 +234,9 @@ class OrderedAsync {
} else {
client.handleIOException(request, (IOException) e, null,
this::resetSlidingWindow);
}
+ if (e instanceof NotLeaderException) {
+ throw new CompletionException(e);
+ }
return null;
}
failAllAsyncRequests(request, e);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index d248298..b99e950 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
@@ -80,7 +81,7 @@ public interface UnorderedAsync {
f.complete(reply);
return;
}
- final RetryPolicy retryPolicy = client.getRetryPolicy();
+ RetryPolicy retryPolicy = client.getRetryPolicy();
if (!retryPolicy.shouldRetry(attemptCount, request)) {
f.completeExceptionally(
client.noMoreRetries(request, attemptCount, replyException !=
null? replyException: e));
@@ -98,6 +99,8 @@ public interface UnorderedAsync {
if (e instanceof IOException) {
if (e instanceof NotLeaderException) {
client.handleNotLeaderException(request, (NotLeaderException) e,
null);
+ retryPolicy = ((NotLeaderException) e).getSuggestedLeader() !=
null ?
+ RetryPolicies.retryForeverNoSleep() : retryPolicy;
} else if (e instanceof GroupMismatchException) {
f.completeExceptionally(e);
return;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index c171c27..7f59fc8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -131,7 +131,7 @@ public interface RetryPolicies {
@Override
public TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
- return shouldRetry(attemptCount, request)? sleepTime: ZERO_MILLIS;
+ return shouldRetry(attemptCount, request) ? sleepTime: ZERO_MILLIS;
}
public int getMaxAttempts() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 7e10441..8ca2867 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.protocol.StateMachineException;
@@ -52,12 +53,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -390,4 +393,31 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
//reset for the other tests
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
oldExpiryTime);
}
+
+ @Test(timeout = 30000)
+ public void testNoRetryWaitOnNotLeaderException() throws Exception {
+ final MiniRaftCluster cluster = newCluster(3);
+ cluster.initServers();
+ cluster.start();
+
+ final RaftServerImpl leader = waitForLeader(cluster);
+ // Order peers before leaders to try
+ List<RaftPeerId> peers = cluster.getPeers().stream()
+ .filter(p -> !p.getId().equals(leader.getId()))
+ .map(RaftPeer::getId).collect(Collectors.toList());
+
+ Assert.assertNotNull(peers);
+ Assert.assertEquals(2, peers.size());
+ Iterator<RaftPeerId> i = peers.listIterator();
+ RetryPolicy unlimitedRetry =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(10,
TimeDuration.valueOf(60, TimeUnit.SECONDS));
+
+ RaftPeerId first = i.next();
+ RaftPeerId second = i.next();
+ try (final RaftClient client = cluster.createClient(first,
cluster.getGroup(), unlimitedRetry)) {
+ client.sendAsync(new SimpleMessage("abc")).get();
+ } finally {
+ cluster.shutdown();
+ }
+ }
}