This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e2cbcc1 RATIS-580. Refactor retryPolicy for RaftClient to customize
wait time based on exception. Contributed by Tsz Wo Nicholas Sze.
e2cbcc1 is described below
commit e2cbcc1b5538f50a2ec80facefe08f307fd05683
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Nov 6 15:11:21 2019 +0530
RATIS-580. Refactor retryPolicy for RaftClient to customize wait time based
on exception. Contributed by Tsz Wo Nicholas Sze.
---
.../org/apache/ratis/client/ClientRetryEvent.java | 56 ++++++++++
.../org/apache/ratis/client/impl/OrderedAsync.java | 20 ++--
.../apache/ratis/client/impl/RaftClientImpl.java | 18 ++--
.../apache/ratis/client/impl/UnorderedAsync.java | 13 ++-
.../retry/RequestTypeDependentRetryPolicy.java | 87 ++++++++++++++++
.../java/org/apache/ratis/retry/RetryPolicies.java | 113 ++++-----------------
.../java/org/apache/ratis/retry/RetryPolicy.java | 48 ++++++---
.../java/org/apache/ratis/util/TimeDuration.java | 9 ++
.../ratis/util/function/CheckedBiFunction.java | 28 +++++
.../test/java/org/apache/ratis/RaftAsyncTests.java | 56 +++++-----
.../java/org/apache/ratis/WatchRequestTests.java | 9 +-
.../java/org/apache/ratis/TestRetryPolicy.java | 44 +++++---
12 files changed, 328 insertions(+), 173 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
new file mode 100644
index 0000000..90be9fb
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientRetryEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client;
+
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.retry.RetryPolicy;
+
+/** An {@link RetryPolicy.Event} specific to client request failure. */
+public class ClientRetryEvent implements RetryPolicy.Event {
+ private final int attemptCount;
+ private final RaftClientRequest request;
+ private final Throwable cause;
+
+ public ClientRetryEvent(int attemptCount, RaftClientRequest request,
Throwable cause) {
+ this.attemptCount = attemptCount;
+ this.request = request;
+ this.cause = cause;
+ }
+
+ public ClientRetryEvent(int attemptCount, RaftClientRequest request) {
+ this(attemptCount, request, null);
+ }
+
+ @Override
+ public int getAttemptCount() {
+ return attemptCount;
+ }
+
+ public RaftClientRequest getRequest() {
+ return request;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":attempt=" + attemptCount +
",request=" + request + ",cause=" + cause;
+ }
+}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 79ee050..01e6771 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.client.ClientRetryEvent;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.conf.RaftProperties;
@@ -148,8 +149,8 @@ public final class OrderedAsync {
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
}
- private void handleAsyncRetryFailure(RaftClientRequest request, int
attemptCount, Throwable throwable) {
- failAllAsyncRequests(request, client.noMoreRetries(request, attemptCount,
throwable));
+ private void handleAsyncRetryFailure(ClientRetryEvent event) {
+ failAllAsyncRequests(event.getRequest(), client.noMoreRetries(event));
}
CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message
message, RaftPeerId server) {
@@ -209,7 +210,8 @@ public final class OrderedAsync {
private void scheduleWithTimeout(PendingOrderedRequest pending,
RaftClientRequest request, RetryPolicy retryPolicy) {
final int attempt = pending.getAttemptCount();
- final TimeDuration sleepTime = retryPolicy.getSleepTime(attempt, request);
+ final ClientRetryEvent event = new ClientRetryEvent(attempt, request);
+ final TimeDuration sleepTime =
retryPolicy.handleAttemptFailure(event).getSleepTime();
LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}",
attempt, sleepTime, retryPolicy, request);
scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
}
@@ -239,8 +241,11 @@ public final class OrderedAsync {
if (reply != null) {
getSlidingWindow(request).receiveReply(
request.getSlidingWindowEntry().getSeqNum(), reply,
this::sendRequestWithRetry);
- } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
- handleAsyncRetryFailure(request, attemptCount, replyException);
+ } else {
+ final ClientRetryEvent event = new ClientRetryEvent(attemptCount,
request, replyException);
+ if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
+ handleAsyncRetryFailure(event);
+ }
}
return reply;
}).exceptionally(e -> {
@@ -251,8 +256,9 @@ public final class OrderedAsync {
}
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
- if (!retryPolicy.shouldRetry(attemptCount, request)) {
- handleAsyncRetryFailure(request, attemptCount, e);
+ final ClientRetryEvent event = new ClientRetryEvent(attemptCount,
request, e);
+ if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
+ handleAsyncRetryFailure(event);
} else {
if (e instanceof NotLeaderException) {
NotLeaderException nle = (NotLeaderException)e;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index e1df86b..1a24cd4 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.client.ClientRetryEvent;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
@@ -252,23 +253,28 @@ final class RaftClientImpl implements RaftClient {
} catch (IOException e) {
ioe = e;
}
- if (!retryPolicy.shouldRetry(attemptCount, request)) {
- throw (IOException)noMoreRetries(request, attemptCount, ioe);
+
+ final ClientRetryEvent event = new ClientRetryEvent(attemptCount,
request, ioe);
+ final RetryPolicy.Action action =
retryPolicy.handleAttemptFailure(event);
+ if (!action.shouldRetry()) {
+ throw (IOException)noMoreRetries(event);
}
try {
- retryPolicy.getSleepTime(attemptCount, request).sleep();
+ action.getSleepTime().sleep();
} catch (InterruptedException e) {
throw new InterruptedIOException("retry policy=" + retryPolicy);
}
}
}
- Throwable noMoreRetries(RaftClientRequest request, int attemptCount,
Throwable throwable) {
+ Throwable noMoreRetries(ClientRetryEvent event) {
+ final int attemptCount = event.getAttemptCount();
+ final Throwable throwable = event.getCause();
if (attemptCount == 1 && throwable != null) {
return throwable;
}
- return new RaftRetryFailureException(request, attemptCount, retryPolicy,
throwable);
+ return new RaftRetryFailureException(event.getRequest(), attemptCount,
retryPolicy, throwable);
}
private RaftClientReply sendRequest(RaftClientRequest request) throws
IOException {
@@ -369,7 +375,7 @@ final class RaftClientImpl implements RaftClient {
clientId, oldLeader, newLeader, ioe.getClass().getName());
this.leaderId = newLeader;
}
- clientRpc.handleException(oldLeader, ioe, reconnect);
+ clientRpc.handleException(oldLeader, ioe, true);
}
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 8dee190..d7c7e11 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.client.ClientRetryEvent;
import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupMismatchException;
@@ -82,9 +83,11 @@ public interface UnorderedAsync {
return;
}
RetryPolicy retryPolicy = client.getRetryPolicy();
- if (!retryPolicy.shouldRetry(attemptCount, request)) {
- f.completeExceptionally(
- client.noMoreRetries(request, attemptCount, replyException !=
null? replyException: e));
+ final ClientRetryEvent event = new ClientRetryEvent(attemptCount,
request,
+ replyException != null? replyException: e);
+ final RetryPolicy.Action action =
retryPolicy.handleAttemptFailure(event);
+ if (!action.shouldRetry()) {
+ f.completeExceptionally(client.noMoreRetries(event));
return;
}
@@ -116,10 +119,10 @@ public interface UnorderedAsync {
}
LOG.debug("schedule retry for attempt #{}, policy={}, request={}",
attemptCount, retryPolicy, request);
- client.getScheduler().onTimeout(retryPolicy.getSleepTime(attemptCount,
request),
+ client.getScheduler().onTimeout(action.getSleepTime(),
() -> sendRequestWithRetry(pending, client), LOG, () -> clientId +
": Failed~ to retry " + request);
} catch (Throwable t) {
- LOG.error(clientId + ": XXX Failed " + request, t);
+ LOG.error(clientId + ": Failed " + request, t);
f.completeExceptionally(t);
}
});
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
new file mode 100644
index 0000000..c4d7523
--- /dev/null
+++
b/ratis-client/src/main/java/org/apache/ratis/client/retry/RequestTypeDependentRetryPolicy.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.retry;
+
+import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.Preconditions;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * A {@link org.apache.ratis.protocol.RaftClientRequest.Type} dependent {@link
RetryPolicy}
+ * such that each type can be set to use an individual policy.
+ * When the policy is not set for a particular type,
+ * the {@link RetryPolicies#retryForeverNoSleep()} policy is used as the
default.
+ */
+public final class RequestTypeDependentRetryPolicy implements RetryPolicy {
+ public static class Builder {
+ private final EnumMap<RaftProtos.RaftClientRequestProto.TypeCase,
RetryPolicy> map
+ = new EnumMap<>(RaftProtos.RaftClientRequestProto.TypeCase.class);
+
+ /** Set the given policy for the given type. */
+ public Builder set(RaftProtos.RaftClientRequestProto.TypeCase type,
RetryPolicy policy) {
+ final RetryPolicy previous = map.put(type, policy);
+ Preconditions.assertNull(previous, () -> "The type " + type + " is
already set to " + previous);
+ return this;
+ }
+
+ public RequestTypeDependentRetryPolicy build() {
+ return new RequestTypeDependentRetryPolicy(map);
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private final Map<RaftProtos.RaftClientRequestProto.TypeCase, RetryPolicy>
map;
+ private final Supplier<String> myString;
+
+ private
RequestTypeDependentRetryPolicy(EnumMap<RaftProtos.RaftClientRequestProto.TypeCase,
RetryPolicy> map) {
+ this.map = Collections.unmodifiableMap(map);
+ this.myString = () -> {
+ final StringBuilder b = new
StringBuilder(getClass().getSimpleName()).append("{");
+ map.forEach((key, value) ->
b.append(key).append("->").append(value).append(", "));
+ b.setLength(b.length() - 2);
+ return b.append("}").toString();
+ };
+ }
+
+ @Override
+ public Action handleAttemptFailure(Event event) {
+ if (!(event instanceof ClientRetryEvent)) {
+ return RetryPolicies.retryForeverNoSleep().handleAttemptFailure(event);
+ }
+ final ClientRetryEvent clientEvent = (ClientRetryEvent) event;
+ return
Optional.ofNullable(map.get(clientEvent.getRequest().getType().getTypeCase()))
+ .orElse(RetryPolicies.retryForeverNoSleep())
+ .handleAttemptFailure(event);
+ }
+
+ @Override
+ public String toString() {
+ return myString.get();
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
index 7f59fc8..4518c11 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicies.java
@@ -17,15 +17,11 @@
*/
package org.apache.ratis.retry;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.function.Supplier;
/**
* A collection of {@link RetryPolicy} implementations
@@ -60,8 +56,8 @@ public interface RetryPolicies {
private RetryForeverNoSleep() {}
@Override
- public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
- return true;
+ public Action handleAttemptFailure(Event event) {
+ return RetryPolicy.RETRY_WITHOUT_SLEEP_ACTION;
}
@Override
@@ -74,8 +70,8 @@ public interface RetryPolicies {
private NoRetry() {}
@Override
- public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
- return false;
+ public Action handleAttemptFailure(Event event) {
+ return RetryPolicy.NO_RETRY_ACTION;
}
@Override
@@ -89,18 +85,13 @@ public interface RetryPolicies {
private final TimeDuration sleepTime;
private RetryForeverWithSleep(TimeDuration sleepTime) {
- Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " +
sleepTime.getDuration() + " < 0");
+ Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " +
sleepTime + " < 0");
this.sleepTime = sleepTime;
}
@Override
- public TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
- return sleepTime;
- }
-
- @Override
- public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
- return true;
+ public Action handleAttemptFailure(Event event) {
+ return () -> sleepTime;
}
@Override
@@ -110,28 +101,20 @@ public interface RetryPolicies {
}
/** For any requests, keep retrying a limited number of attempts with a
fixed sleep time between attempts. */
- class RetryLimited implements RetryPolicy {
+ class RetryLimited extends RetryForeverWithSleep {
private final int maxAttempts;
- private final TimeDuration sleepTime;
-
- private String myString;
+ private final Supplier<String> myString;
private RetryLimited(int maxAttempts, TimeDuration sleepTime) {
+ super(sleepTime);
+
if (maxAttempts < 0) {
throw new IllegalArgumentException("maxAttempts = " + maxAttempts+" <
0");
}
- if (sleepTime.isNegative()) {
- throw new IllegalArgumentException(
- "sleepTime = " + sleepTime.getDuration() + " < 0");
- }
this.maxAttempts = maxAttempts;
- this.sleepTime = sleepTime;
- }
-
- @Override
- public TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
- return shouldRetry(attemptCount, request) ? sleepTime: ZERO_MILLIS;
+ this.myString = JavaUtils.memoize(() -> getClass().getSimpleName()
+ + "(maxAttempts=" + maxAttempts + ", sleepTime=" + sleepTime + ")");
}
public int getMaxAttempts() {
@@ -139,73 +122,13 @@ public interface RetryPolicies {
}
@Override
- public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
- return attemptCount < maxAttempts;
- }
-
- @Override
- public String toString() {
- if (myString == null) {
- myString = getClass().getSimpleName() + "(maxAttempts=" + maxAttempts
- + ", sleepTime=" + sleepTime + ")";
- }
- return myString;
- }
- }
-
- /**
- * A {@link RaftClientRequest.Type} dependent {@link RetryPolicy}
- * such that each type can be set to use an individual policy.
- * When the policy is not set for a particular type,
- * the {@link #retryForeverNoSleep()} policy is used as the default.
- */
- class RequestTypeDependentRetry implements RetryPolicy {
- public static class Builder {
- private final EnumMap<RaftClientRequestProto.TypeCase, RetryPolicy> map
- = new EnumMap<>(RaftClientRequestProto.TypeCase.class);
-
- /** Set the given policy for the given type. */
- public Builder set(RaftClientRequestProto.TypeCase type, RetryPolicy
policy) {
- final RetryPolicy previous = map.put(type, policy);
- Preconditions.assertNull(previous, () -> "The type " + type + " is
already set to " + previous);
- return this;
- }
-
- public RequestTypeDependentRetry build() {
- return new RequestTypeDependentRetry(map);
- }
- }
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- private final Map<RaftClientRequestProto.TypeCase, RetryPolicy> map;
-
- private RequestTypeDependentRetry(EnumMap<RaftClientRequestProto.TypeCase,
RetryPolicy> map) {
- this.map = Collections.unmodifiableMap(map);
- }
-
- @Override
- public boolean shouldRetry(int attemptCount, RaftClientRequest request) {
- return Optional.ofNullable(map.get(request.getType().getTypeCase()))
- .orElse(retryForeverNoSleep())
- .shouldRetry(attemptCount, request);
- }
-
- @Override
- public TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
- return Optional.ofNullable(map.get(request.getType().getTypeCase()))
- .orElse(retryForeverNoSleep())
- .getSleepTime(attemptCount, request);
+ public Action handleAttemptFailure(Event event) {
+ return event.getAttemptCount() < maxAttempts?
super.handleAttemptFailure(event): NO_RETRY_ACTION;
}
@Override
public String toString() {
- final StringBuilder b = new
StringBuilder(getClass().getSimpleName()).append("{");
- map.forEach((key, value) ->
b.append(key).append("->").append(value).append(", "));
- b.setLength(b.length() - 2);
- return b.append("}").toString();
+ return myString.get();
}
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
index 162f760..4137982 100644
--- a/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
+++ b/ratis-common/src/main/java/org/apache/ratis/retry/RetryPolicy.java
@@ -17,31 +17,47 @@
*/
package org.apache.ratis.retry;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.TimeDuration;
-import java.util.concurrent.TimeUnit;
-
/**
* Policy abstract for retrying.
*/
public interface RetryPolicy {
- TimeDuration ZERO_MILLIS = TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
+ Action NO_RETRY_ACTION = new Action() {
+ @Override
+ public boolean shouldRetry() {
+ return false;
+ }
+ @Override
+ public TimeDuration getSleepTime() {
+ return TimeDuration.ZERO;
+ }
+ };
+
+ Action RETRY_WITHOUT_SLEEP_ACTION = () -> TimeDuration.ZERO;
+
+ /** The action it should take. */
+ interface Action {
+ /** @return true if it has to make another attempt; otherwise, return
false. */
+ default boolean shouldRetry() {
+ return true;
+ }
+
+ /** @return the sleep time period before the next attempt. */
+ TimeDuration getSleepTime();
+ }
+
+ /** The event triggered the failure. */
+ interface Event {
+ /** @return the number of attempts tried so far. */
+ int getAttemptCount();
+ }
/**
* Determines whether it is supposed to retry after the operation has failed.
*
- * @param attemptCount The number of times attempted so far.
- * @param request The failed request.
- * @return true if it has to make another attempt; otherwise, return false.
+ * @param event The failed event.
+ * @return the action it should take.
*/
- boolean shouldRetry(int attemptCount, RaftClientRequest request);
-
- /**
- * @param attemptCount The number of times attempted so far.
- * @return the {@link TimeDuration} to sleep in between the retries.
- */
- default TimeDuration getSleepTime(int attemptCount, RaftClientRequest
request) {
- return ZERO_MILLIS;
- }
+ Action handleAttemptFailure(Event event);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 49aea5e..90dbe60 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.util.function.CheckedBiFunction;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -34,6 +36,7 @@ import java.util.function.LongUnaryOperator;
public final class TimeDuration implements Comparable<TimeDuration> {
public static final TimeDuration ZERO = valueOf(0, TimeUnit.NANOSECONDS);
public static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1,
TimeUnit.SECONDS);
+ public static final TimeDuration ONE_MINUTE = TimeDuration.valueOf(1,
TimeUnit.MINUTES);
/** Abbreviations of {@link TimeUnit}. */
public enum Abbreviation {
@@ -201,6 +204,12 @@ public final class TimeDuration implements
Comparable<TimeDuration> {
return valueOf(operator.applyAsLong(duration), unit);
}
+ /** Apply the given function to the (duration, unit) of this object. */
+ public <OUTPUT, THROWABLE extends Throwable> OUTPUT apply(
+ CheckedBiFunction<Long, TimeUnit, OUTPUT, THROWABLE> function) throws
THROWABLE {
+ return function.apply(getDuration(), getUnit());
+ }
+
/** @return Is this {@link TimeDuration} negative? */
public boolean isNegative() {
return duration < 0;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedBiFunction.java
b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedBiFunction.java
new file mode 100644
index 0000000..24df140
--- /dev/null
+++
b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedBiFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+/** BiFunction with a throws-clause. */
+@FunctionalInterface
+public interface CheckedBiFunction<LEFT, RIGHT, OUTPUT, THROWABLE extends
Throwable> {
+ /**
+ * The same as {@link java.util.function.BiFunction#apply(Object, Object)}
+ * except that this method is declared with a throws-clause.
+ */
+ OUTPUT apply(LEFT left, RIGHT right) throws THROWABLE;
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index fc1f100..f44ec32 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -29,7 +29,6 @@ import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.apache.ratis.protocol.StateMachineException;
@@ -53,14 +52,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -262,7 +260,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
@Test
public void testWithLoadAsync() throws Exception {
runWithNewCluster(NUM_SERVERS,
- cluster -> RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG));
+ cluster -> RaftBasicTests.testWithLoad(5, 500, true, cluster, LOG));
}
@Test
@@ -403,30 +401,38 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
oldExpiryTime);
}
- @Test(timeout = 30000)
+ @Test
public void testNoRetryWaitOnNotLeaderException() throws Exception {
- final MiniRaftCluster cluster = newCluster(3);
- cluster.initServers();
- cluster.start();
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
false);
+ runWithNewCluster(3, this::runTestNoRetryWaitOnNotLeaderException);
+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(),
true);
+ }
+ private void runTestNoRetryWaitOnNotLeaderException(MiniRaftCluster cluster)
throws Exception {
final RaftServerImpl leader = waitForLeader(cluster);
- // Order peers before leaders to try
- List<RaftPeerId> peers = cluster.getPeers().stream()
- .filter(p -> !p.getId().equals(leader.getId()))
- .map(RaftPeer::getId).collect(Collectors.toList());
-
- Assert.assertNotNull(peers);
- Assert.assertEquals(2, peers.size());
- Iterator<RaftPeerId> i = peers.listIterator();
- RetryPolicy unlimitedRetry =
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(10,
TimeDuration.valueOf(60, TimeUnit.SECONDS));
-
- RaftPeerId first = i.next();
- RaftPeerId second = i.next();
- try (final RaftClient client = cluster.createClient(first,
cluster.getGroup(), unlimitedRetry)) {
- client.sendAsync(new SimpleMessage("abc")).get();
- } finally {
- cluster.shutdown();
+ final List<RaftServerImpl> followers = cluster.getFollowers();
+ Assert.assertNotNull(followers);
+ Assert.assertEquals(2, followers.size());
+ Assert.assertNotSame(leader, followers.get(0));
+ Assert.assertNotSame(leader, followers.get(1));
+
+ // send a message to make sure that the leader is ready
+ try (final RaftClient client = cluster.createClient(leader.getId())) {
+ final CompletableFuture<RaftClientReply> f = client.sendAsync(new
SimpleMessage("first"));
+ FIVE_SECONDS.apply(f::get);
+ }
+
+ final RetryPolicy r = event -> () -> {
+ final IllegalStateException e = new IllegalStateException("Unexpected
getSleepTime: " + event);
+ setFirstException(e);
+ throw e;
+ };
+
+ try (final RaftClient client =
cluster.createClient(followers.get(0).getId(), cluster.getGroup(), r)) {
+ final CompletableFuture<RaftClientReply> f = client.sendAsync(new
SimpleMessage("abc"));
+ FIVE_SECONDS.apply(f::get);
+ } catch (TimeoutException e) {
+ throw new AssertionError("Failed to get async result", e);
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index a9868ba..3a80c80 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -57,7 +57,7 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
}
static final int NUM_SERVERS = 3;
- static final int GET_TIMEOUT_SECOND = 5;
+ static final int GET_TIMEOUT_SECOND = 10;
@Before
public void setup() {
@@ -116,11 +116,10 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
static void runTest(CheckedConsumer<TestParameters, Exception> testCase,
MiniRaftCluster cluster, Logger LOG)
throws Exception {
try(final RaftClient client =
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
- final int[] numMessages = {1, 10, 100};
- for(int i = 0; i < 5; i++) {
- final int n =
numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
+ final int[] numMessages = {1, 10, 20};
+ for(int n : numMessages) {
final TestParameters p = new TestParameters(n, client, cluster, LOG);
- LOG.info("{}) {}, {}", i, p, cluster.printServers());
+ LOG.info("{}) {}, {}", n, p, cluster.printServers());
testCase.accept(p);
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
index 4049f6b..967a907 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRetryPolicy.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis;
+import org.apache.ratis.client.ClientRetryEvent;
+import org.apache.ratis.client.retry.RequestTypeDependentRetryPolicy;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.ClientId;
@@ -37,21 +39,24 @@ public class TestRetryPolicy extends BaseTest {
final int n = 4;
final TimeDuration sleepTime = HUNDRED_MILLIS;
final RetryPolicy policy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, sleepTime);
- final RaftClientRequest request =
newRaftClientRequest(RaftClientRequest.readRequestType());
for(int i = 1; i < 2*n; i++) {
+ final int attempt = i;
+ final RetryPolicy.Event event = () -> attempt;
+ final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+
final boolean expected = i < n;
- Assert.assertEquals(expected, policy.shouldRetry(i, request));
+ Assert.assertEquals(expected, action.shouldRetry());
if (expected) {
- Assert.assertEquals(sleepTime, policy.getSleepTime(i, request));
+ Assert.assertEquals(sleepTime, action.getSleepTime());
} else {
- Assert.assertEquals(0L, policy.getSleepTime(i, request).getDuration());
+ Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
}
}
@Test
public void testRequestTypeDependentRetry() {
- final RetryPolicies.RequestTypeDependentRetry.Builder b =
RetryPolicies.RequestTypeDependentRetry.newBuilder();
+ final RequestTypeDependentRetryPolicy.Builder b =
RequestTypeDependentRetryPolicy.newBuilder();
final int n = 4;
final TimeDuration writeSleep = HUNDRED_MILLIS;
final RetryPolicies.RetryLimited writePolicy =
RetryPolicies.retryUpToMaximumCountWithFixedSleep(n, writeSleep);
@@ -67,26 +72,37 @@ public class TestRetryPolicy extends BaseTest {
RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
for(int i = 1; i < 2*n; i++) {
{ //write
+ final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest);
+ final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+
final boolean expected = i < n;
- Assert.assertEquals(expected, policy.shouldRetry(i, writeRequest));
+ Assert.assertEquals(expected, action.shouldRetry());
if (expected) {
- Assert.assertEquals(writeSleep, policy.getSleepTime(i,
writeRequest));
+ Assert.assertEquals(writeSleep, action.getSleepTime());
} else {
- Assert.assertEquals(0L, policy.getSleepTime(i,
writeRequest).getDuration());
+ Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
}
{ //read and stale read are using default
- Assert.assertTrue(policy.shouldRetry(i, readRequest));
- Assert.assertEquals(0L, policy.getSleepTime(i,
readRequest).getDuration());
+ final ClientRetryEvent event = new ClientRetryEvent(i, readRequest);
+ final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+ Assert.assertTrue(action.shouldRetry());
+ Assert.assertEquals(0L, action.getSleepTime().getDuration());
+ }
- Assert.assertTrue(policy.shouldRetry(i, staleReadRequest));
- Assert.assertEquals(0L, policy.getSleepTime(i,
staleReadRequest).getDuration());
+ {
+ final ClientRetryEvent event = new ClientRetryEvent(i,
staleReadRequest);
+ final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+ Assert.assertTrue(action.shouldRetry());
+ Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
{ //watch has no retry
- Assert.assertFalse(policy.shouldRetry(i, watchRequest));
- Assert.assertEquals(0L, policy.getSleepTime(i,
watchRequest).getDuration());
+ final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest);
+ final RetryPolicy.Action action = policy.handleAttemptFailure(event);
+ Assert.assertFalse(action.shouldRetry());
+ Assert.assertEquals(0L, action.getSleepTime().getDuration());
}
}