This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 55cbfbb RATIS-726. TimeoutScheduler holds on to the raftClientRequest
till it times out even though request succeeds. Contributed by Tsz-wo Sze.
55cbfbb is described below
commit 55cbfbbca68aca531bc261786a34499fce4de700
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Wed Oct 23 17:21:00 2019 +0530
RATIS-726. TimeoutScheduler holds on to the raftClientRequest till it times
out even though request succeeds. Contributed by Tsz-wo Sze.
---
.../org/apache/ratis/client/impl/OrderedAsync.java | 49 ++++++++++++++--------
1 file changed, 31 insertions(+), 18 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index efd26a1..7694450 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -36,43 +36,45 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.SlidingWindow;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongFunction;
/** Send ordered asynchronous requests to a raft service. */
class OrderedAsync {
- static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(OrderedAsync.class);
static class PendingOrderedRequest extends PendingClientRequest
implements SlidingWindow.ClientSideRequest<RaftClientReply> {
- private final Function<SlidingWindowEntry, RaftClientRequest>
requestConstructor;
+ private final long callId;
private final long seqNum;
+ private final AtomicReference<Function<SlidingWindowEntry,
RaftClientRequest>> requestConstructor;
private volatile boolean isFirst = false;
- private volatile RaftClientRequest request;
- PendingOrderedRequest(long seqNum, Function<SlidingWindowEntry,
RaftClientRequest> requestConstructor) {
+ PendingOrderedRequest(long callId, long seqNum,
+ Function<SlidingWindowEntry, RaftClientRequest> requestConstructor) {
+ this.callId = callId;
this.seqNum = seqNum;
- this.requestConstructor = requestConstructor;
+ this.requestConstructor = new AtomicReference<>(requestConstructor);
}
@Override
RaftClientRequest newRequestImpl() {
- request =
requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst));
- return request;
- }
-
- RaftClientRequest getRequest() {
- return request;
+ return Optional.ofNullable(requestConstructor.get())
+ .map(f -> f.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst)))
+ .orElse(null);
}
@Override
@@ -92,17 +94,19 @@ class OrderedAsync {
@Override
public void setReply(RaftClientReply reply) {
+ requestConstructor.set(null);
getReplyFuture().complete(reply);
}
@Override
public void fail(Throwable e) {
+ requestConstructor.set(null);
getReplyFuture().completeExceptionally(e);
}
@Override
public String toString() {
- return "[seq=" + getSeqNum() + "]";
+ return "[cid=" + callId + ", seq=" + getSeqNum() + "]";
}
}
@@ -150,7 +154,7 @@ class OrderedAsync {
}
final long callId = RaftClientImpl.nextCallId();
- final LongFunction<PendingOrderedRequest> constructor = seqNum -> new
PendingOrderedRequest(seqNum,
+ final LongFunction<PendingOrderedRequest> constructor = seqNum -> new
PendingOrderedRequest(callId, seqNum,
slidingWindowEntry -> client.newRaftClientRequest(server, callId,
message, type, slidingWindowEntry));
return getSlidingWindow(server).submitNewRequest(constructor,
this::sendRequestWithRetry
).getReplyFuture(
@@ -159,13 +163,17 @@ class OrderedAsync {
}
private void sendRequestWithRetry(PendingOrderedRequest pending) {
- final RetryPolicy retryPolicy = client.getRetryPolicy();
final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
if (f.isDone()) {
return;
}
- RaftClientRequest request = pending.newRequestImpl();
+ final RaftClientRequest request = pending.newRequestImpl();
+ if (request == null) { // already done
+ return;
+ }
+
+ final RetryPolicy retryPolicy = client.getRetryPolicy();
sendRequest(pending).thenAccept(reply -> {
if (f.isDone()) {
return;
@@ -191,9 +199,14 @@ class OrderedAsync {
private void scheduleWithTimeout(PendingOrderedRequest pending,
RaftClientRequest request, RetryPolicy retryPolicy) {
final int attempt = pending.getAttemptCount();
LOG.debug("schedule* attempt #{} with policy {} for {}", attempt,
retryPolicy, request);
- client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt, request),
- () -> getSlidingWindow(request).retry(pending,
this::sendRequestWithRetry),
- LOG, () -> "Failed* to retry " + request);
+ final TimeDuration sleepTime = retryPolicy.getSleepTime(attempt, request);
+ scheduleWithTimeout(pending, request.getServerId(), sleepTime);
+ }
+
+ private void scheduleWithTimeout(PendingOrderedRequest pending, RaftPeerId
serverId, TimeDuration sleepTime) {
+ client.getScheduler().onTimeout(sleepTime,
+ () -> getSlidingWindow(serverId).retry(pending,
this::sendRequestWithRetry),
+ LOG, () -> "Failed* to retry " + pending);
}
private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest
pending) {