This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 231b35c  RATIS-565. Move the ordered async client code to a new class. 
Contributed by Tsz Wo Nicholas Sze.
231b35c is described below

commit 231b35c23d44a0afb889f0c6bb5ba9194b3cf398
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed May 29 14:27:47 2019 +0530

    RATIS-565. Move the ordered async client code to a new class. Contributed 
by Tsz Wo Nicholas Sze.
---
 .../org/apache/ratis/client/impl/OrderedAsync.java | 216 +++++++++++++++++++++
 .../apache/ratis/client/impl/RaftClientImpl.java   | 204 ++++---------------
 .../ratis/client/impl/RaftClientTestUtil.java      |   5 +-
 .../apache/ratis/client/impl/UnorderedAsync.java   |  10 +-
 4 files changed, 259 insertions(+), 176 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
new file mode 100644
index 0000000..4b5991c
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.SlidingWindow;
+import org.apache.ratis.util.function.FunctionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+import java.util.function.Function;
+import java.util.function.LongFunction;
+
+/** Send ordered asynchronous requests to a raft service. */
+class OrderedAsync {
+  static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
+
+  static class PendingOrderedRequest extends PendingClientRequest
+      implements SlidingWindow.ClientSideRequest<RaftClientReply> {
+    private final Function<SlidingWindowEntry, RaftClientRequest> 
requestConstructor;
+    private final long seqNum;
+    private volatile boolean isFirst = false;
+
+    PendingOrderedRequest(long seqNum, Function<SlidingWindowEntry, 
RaftClientRequest> requestConstructor) {
+      this.seqNum = seqNum;
+      this.requestConstructor = requestConstructor;
+    }
+
+    @Override
+    RaftClientRequest newRequestImpl() {
+      return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, 
isFirst));
+    }
+
+    @Override
+    public void setFirstRequest() {
+      isFirst = true;
+    }
+
+    @Override
+    public long getSeqNum() {
+      return seqNum;
+    }
+
+    @Override
+    public boolean hasReply() {
+      return getReplyFuture().isDone();
+    }
+
+    @Override
+    public void setReply(RaftClientReply reply) {
+      getReplyFuture().complete(reply);
+    }
+
+    @Override
+    public void fail(Throwable e) {
+      getReplyFuture().completeExceptionally(e);
+    }
+
+    @Override
+    public String toString() {
+      return "[seq=" + getSeqNum() + "]";
+    }
+  }
+
+  private final RaftClientImpl client;
+  /** Map: id -> {@link SlidingWindow}, in order to support async calls to the 
Raft service or individual servers. */
+  private final ConcurrentMap<String, 
SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>> slidingWindows
+      = new ConcurrentHashMap<>();
+  private final Semaphore requestSemaphore;
+
+  OrderedAsync(RaftClientImpl client, RaftProperties properties) {
+    this.client = Objects.requireNonNull(client, "client == null");
+    this.requestSemaphore = new 
Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
+  }
+
+  private void resetSlidingWindow(RaftClientRequest request) {
+    getSlidingWindow(request).resetFirstSeqNum();
+  }
+
+  private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> 
getSlidingWindow(RaftClientRequest request) {
+    return getSlidingWindow(request.is(TypeCase.STALEREAD) ? 
request.getServerId() : null);
+  }
+
+  private SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> 
getSlidingWindow(RaftPeerId target) {
+    final String id = target != null ? target.toString() : "RAFT";
+    return slidingWindows.computeIfAbsent(id, key -> new 
SlidingWindow.Client<>(client.getId() + "->" + key));
+  }
+
+  private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
+    
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
+  }
+
+  private void handleAsyncRetryFailure(RaftClientRequest request, int 
attemptCount, Throwable throwable) {
+    failAllAsyncRequests(request, client.noMoreRetries(request, attemptCount, 
throwable));
+  }
+
+  CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message 
message, RaftPeerId server) {
+    if (!type.is(TypeCase.WATCH)) {
+      Objects.requireNonNull(message, "message == null");
+    }
+    try {
+      requestSemaphore.acquire();
+    } catch (InterruptedException e) {
+      return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
+          "Interrupted when sending " + type + ", message=" + message, e));
+    }
+
+    final long callId = RaftClientImpl.nextCallId();
+    final LongFunction<PendingOrderedRequest> constructor = seqNum -> new 
PendingOrderedRequest(seqNum,
+        slidingWindowEntry -> client.newRaftClientRequest(server, callId, 
message, type, slidingWindowEntry));
+    return getSlidingWindow(server).submitNewRequest(constructor, 
this::sendRequestWithRetry
+    ).getReplyFuture(
+    ).thenApply(reply -> RaftClientImpl.handleRaftException(reply, 
CompletionException::new)
+    ).whenComplete((r, e) -> requestSemaphore.release());
+  }
+
+  private void sendRequestWithRetry(PendingOrderedRequest pending) {
+    final RetryPolicy retryPolicy = client.getRetryPolicy();
+    final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
+    if (f.isDone()) {
+      return;
+    }
+
+    final RaftClientRequest request = pending.newRequest();
+    sendRequest(request, pending.getAttemptCount()).thenAccept(reply -> {
+      if (f.isDone()) {
+        return;
+      }
+      if (reply == null) {
+        final int attempt = pending.getAttemptCount();
+        LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
+        client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt, 
request),
+            () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetry),
+            LOG, () -> "Failed* to retry " + request);
+      } else {
+        f.complete(reply);
+      }
+    
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
+  }
+
+  private CompletableFuture<RaftClientReply> sendRequest(RaftClientRequest 
request, int attemptCount) {
+    final RetryPolicy retryPolicy = client.getRetryPolicy();
+    LOG.debug("{}: send* {}", client.getId(), request);
+    return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+      LOG.debug("{}: receive* {}", client.getId(), reply);
+      final RaftException replyException = reply != null? 
reply.getException(): null;
+      reply = client.handleNotLeaderException(request, reply, 
this::resetSlidingWindow);
+      if (reply != null) {
+        getSlidingWindow(request).receiveReply(
+            request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetry);
+      } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
+        handleAsyncRetryFailure(request, attemptCount, replyException);
+      }
+      return reply;
+    }).exceptionally(e -> {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(client.getId() + ": Failed* " + request, e);
+      } else {
+        LOG.debug("{}: Failed* {} with {}", client.getId(), request, e);
+      }
+      e = JavaUtils.unwrapCompletionException(e);
+      if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
+        if (!retryPolicy.shouldRetry(attemptCount, request)) {
+          handleAsyncRetryFailure(request, attemptCount, e);
+        } else {
+          client.handleIOException(request, (IOException) e, null, 
this::resetSlidingWindow);
+        }
+        return null;
+      }
+      failAllAsyncRequests(request, e);
+      return null;
+    });
+  }
+
+  void assertRequestSemaphore(int expectedAvailablePermits, int 
expectedQueueLength) {
+    Preconditions.assertTrue(requestSemaphore.availablePermits() == 
expectedAvailablePermits);
+    Preconditions.assertTrue(requestSemaphore.getQueueLength() == 
expectedQueueLength);
+  }
+}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 024f6c7..a31da14 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -21,27 +21,31 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.util.*;
-import org.apache.ratis.util.function.FunctionUtils;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeoutScheduler;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.LongFunction;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
-import static 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.STALEREAD;
-import static 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.WATCH;
-
 /** A client who sends requests to a raft service. */
 final class RaftClientImpl implements RaftClient {
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
@@ -70,53 +74,6 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
-  static class PendingAsyncRequest extends PendingClientRequest
-      implements SlidingWindow.ClientSideRequest<RaftClientReply> {
-    private final Function<SlidingWindowEntry, RaftClientRequest> 
requestConstructor;
-    private final long seqNum;
-    private volatile boolean isFirst = false;
-
-    PendingAsyncRequest(long seqNum, Function<SlidingWindowEntry, 
RaftClientRequest> requestConstructor) {
-      this.seqNum = seqNum;
-      this.requestConstructor = requestConstructor;
-    }
-
-    @Override
-    RaftClientRequest newRequestImpl() {
-      return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, 
isFirst));
-    }
-
-    @Override
-    public void setFirstRequest() {
-      isFirst = true;
-    }
-
-    @Override
-    public long getSeqNum() {
-      return seqNum;
-    }
-
-    @Override
-    public boolean hasReply() {
-      return getReplyFuture().isDone();
-    }
-
-    @Override
-    public void setReply(RaftClientReply reply) {
-      getReplyFuture().complete(reply);
-    }
-
-    @Override
-    public void fail(Throwable e) {
-      getReplyFuture().completeExceptionally(e);
-    }
-
-    @Override
-    public String toString() {
-      return "[seq=" + getSeqNum() + "]";
-    }
-  }
-
   private final ClientId clientId;
   private final RaftClientRpc clientRpc;
   private final Collection<RaftPeer> peers;
@@ -125,11 +82,9 @@ final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
-  /** Map: id -> {@link SlidingWindow}, in order to support async calls to the 
RAFT service or individual servers. */
-  private final ConcurrentMap<String, 
SlidingWindow.Client<PendingAsyncRequest, RaftClientReply>>
-      slidingWindows = new ConcurrentHashMap<>();
   private final TimeoutScheduler scheduler;
-  private final Semaphore asyncRequestSemaphore;
+
+  private final Supplier<OrderedAsync> orderedAsync;
 
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
       RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy 
retryPolicy) {
@@ -142,9 +97,10 @@ final class RaftClientImpl implements RaftClient {
     Preconditions.assertTrue(retryPolicy != null, "retry policy can't be 
null");
     this.retryPolicy = retryPolicy;
 
-    asyncRequestSemaphore = new 
Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
     scheduler = 
TimeoutScheduler.newInstance(RaftClientConfigKeys.Async.schedulerThreads(properties));
     clientRpc.addServers(peers);
+
+    this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this, 
properties));
   }
 
   @Override
@@ -160,13 +116,8 @@ final class RaftClientImpl implements RaftClient {
     return scheduler;
   }
 
-  private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> 
getSlidingWindow(RaftClientRequest request) {
-    return getSlidingWindow(request.is(STALEREAD)? request.getServerId(): 
null);
-  }
-
-  private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> 
getSlidingWindow(RaftPeerId target) {
-    final String id = target != null? target.toString(): "RAFT";
-    return slidingWindows.computeIfAbsent(id, key -> new 
SlidingWindow.Client<>(getId() + "->" + key));
+  OrderedAsync getOrderedAsync() {
+    return orderedAsync.get();
   }
 
   @Override
@@ -191,23 +142,7 @@ final class RaftClientImpl implements RaftClient {
 
   private CompletableFuture<RaftClientReply> sendAsync(
       RaftClientRequest.Type type, Message message, RaftPeerId server) {
-    if (!type.is(WATCH)) {
-      Objects.requireNonNull(message, "message == null");
-    }
-    try {
-      asyncRequestSemaphore.acquire();
-    } catch (InterruptedException e) {
-      return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
-          "Interrupted when sending " + type + ", message=" + message, e));
-    }
-
-    final long callId = nextCallId();
-    final LongFunction<PendingAsyncRequest> constructor = seqNum -> new 
PendingAsyncRequest(seqNum,
-        slidingWindowEntry -> newRaftClientRequest(server, callId, message, 
type, slidingWindowEntry));
-    return getSlidingWindow(server).submitNewRequest(constructor, 
this::sendRequestWithRetryAsync
-    ).getReplyFuture(
-    ).thenApply(reply -> handleRaftException(reply, CompletionException::new)
-    ).whenComplete((r, e) -> asyncRequestSemaphore.release());
+    return getOrderedAsync().send(type, message, server);
   }
 
   RaftClientRequest newRaftClientRequest(
@@ -240,7 +175,7 @@ final class RaftClientImpl implements RaftClient {
 
   private RaftClientReply send(RaftClientRequest.Type type, Message message, 
RaftPeerId server)
       throws IOException {
-    if (!type.is(WATCH)) {
+    if (!type.is(TypeCase.WATCH)) {
       Objects.requireNonNull(message, "message == null");
     }
 
@@ -304,29 +239,6 @@ final class RaftClientImpl implements RaftClient {
         peersInNewConf.filter(p -> !peers.contains(p))::iterator);
   }
 
-  private void sendRequestWithRetryAsync(PendingAsyncRequest pending) {
-    final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
-    if (f.isDone()) {
-      return;
-    }
-
-    final RaftClientRequest request = pending.newRequest();
-    sendRequestAsync(request, pending.getAttemptCount()).thenAccept(reply -> {
-      if (f.isDone()) {
-        return;
-      }
-      if (reply == null) {
-        final int attempt = pending.getAttemptCount();
-        LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
-        scheduler.onTimeout(retryPolicy.getSleepTime(attempt, request),
-            () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetryAsync),
-            LOG, () -> "Failed* to retry " + request);
-      } else {
-        f.complete(reply);
-      }
-    
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
-  }
-
   private RaftClientReply sendRequestWithRetry(Supplier<RaftClientRequest> 
supplier) throws IOException {
     for(int attemptCount = 1;; attemptCount++) {
       final RaftClientRequest request = supplier.get();
@@ -342,7 +254,7 @@ final class RaftClientImpl implements RaftClient {
         ioe = e;
       }
       if (!retryPolicy.shouldRetry(attemptCount, request)) {
-        throw (IOException)noMoreRetries(request, attemptCount, retryPolicy, 
ioe);
+        throw (IOException)noMoreRetries(request, attemptCount, ioe);
       }
 
       try {
@@ -353,53 +265,11 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
-  private CompletableFuture<RaftClientReply> sendRequestAsync(
-      RaftClientRequest request, int attemptCount) {
-    LOG.debug("{}: send* {}", clientId, request);
-    return clientRpc.sendRequestAsync(request).thenApply(reply -> {
-      LOG.debug("{}: receive* {}", clientId, reply);
-      final RaftException replyException = reply != null? 
reply.getException(): null;
-      reply = handleNotLeaderException(request, reply, true);
-      if (reply != null) {
-        getSlidingWindow(request).receiveReply(
-            request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetryAsync);
-      } else if (!retryPolicy.shouldRetry(attemptCount, request)) {
-        handleAsyncRetryFailure(request, attemptCount, replyException);
-      }
-      return reply;
-    }).exceptionally(e -> {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(clientId + ": Failed* " + request, e);
-      } else {
-        LOG.debug("{}: Failed* {} with {}", clientId, request, e);
-      }
-      e = JavaUtils.unwrapCompletionException(e);
-      if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
-        if (!retryPolicy.shouldRetry(attemptCount, request)) {
-          handleAsyncRetryFailure(request, attemptCount, e);
-        } else {
-          handleIOException(request, (IOException) e, null, true);
-        }
-        return null;
-      }
-      failAllAsyncRequests(request, e);
-      return null;
-    });
-  }
-
-  static Throwable noMoreRetries(RaftClientRequest request, int attemptCount, 
RetryPolicy policy, Throwable throwable) {
+  Throwable noMoreRetries(RaftClientRequest request, int attemptCount, 
Throwable throwable) {
     if (attemptCount == 1 && throwable != null) {
       return throwable;
     }
-    return new RaftRetryFailureException(request, attemptCount, policy, 
throwable);
-  }
-
-  private void handleAsyncRetryFailure(RaftClientRequest request, int 
attemptCount, Throwable throwable) {
-    failAllAsyncRequests(request, noMoreRetries(request, attemptCount, 
retryPolicy, throwable));
-  }
-
-  private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
-    
getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
+    return new RaftRetryFailureException(request, attemptCount, retryPolicy, 
throwable);
   }
 
   private RaftClientReply sendRequest(RaftClientRequest request) throws 
IOException {
@@ -410,11 +280,11 @@ final class RaftClientImpl implements RaftClient {
     } catch (GroupMismatchException gme) {
       throw gme;
     } catch (IOException ioe) {
-      handleIOException(request, ioe, null, false);
+      handleIOException(request, ioe);
       throw ioe;
     }
     LOG.debug("{}: receive {}", clientId, reply);
-    reply = handleNotLeaderException(request, reply, false);
+    reply = handleNotLeaderException(request, reply, null);
     reply = handleRaftException(reply, Function.identity());
     return reply;
   }
@@ -435,7 +305,7 @@ final class RaftClientImpl implements RaftClient {
    *         otherwise return the same reply.
    */
   RaftClientReply handleNotLeaderException(RaftClientRequest request, 
RaftClientReply reply,
-      boolean resetSlidingWindow) {
+      Consumer<RaftClientRequest> handler) {
     if (reply == null) {
       return null;
     }
@@ -443,15 +313,15 @@ final class RaftClientImpl implements RaftClient {
     if (nle == null) {
       return reply;
     }
-    return handleNotLeaderException(request, nle, resetSlidingWindow);
+    return handleNotLeaderException(request, nle, handler);
   }
 
   RaftClientReply handleNotLeaderException(RaftClientRequest request, 
NotLeaderException nle,
-      boolean resetSlidingWindow) {
+      Consumer<RaftClientRequest> handler) {
     refreshPeers(nle.getPeers());
     final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
         : nle.getSuggestedLeader().getId();
-    handleIOException(request, nle, newLeader, resetSlidingWindow);
+    handleIOException(request, nle, newLeader, handler);
     return null;
   }
 
@@ -464,17 +334,20 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
+  void handleIOException(RaftClientRequest request, IOException ioe) {
+    handleIOException(request, ioe, null, null);
+  }
+
   void handleIOException(RaftClientRequest request, IOException ioe,
-      RaftPeerId newLeader, boolean resetSlidingWindow) {
+      RaftPeerId newLeader, Consumer<RaftClientRequest> handler) {
     LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
         clientId, newLeader, request, ioe);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Stack trace", new Throwable("TRACE"));
     }
 
-    if (resetSlidingWindow) {
-      getSlidingWindow(request).resetFirstSeqNum();
-    }
+    Optional.ofNullable(handler).ifPresent(h -> h.accept(request));
+
     if (ioe instanceof LeaderNotReadyException) {
       return;
     }
@@ -496,11 +369,6 @@ final class RaftClientImpl implements RaftClient {
     clientRpc.handleException(oldLeader, ioe, changeLeader);
   }
 
-  void assertAsyncRequestSemaphore(int expectedAvailablePermits, int 
expectedQueueLength) {
-    Preconditions.assertTrue(asyncRequestSemaphore.availablePermits() == 
expectedAvailablePermits);
-    Preconditions.assertTrue(asyncRequestSemaphore.getQueueLength() == 
expectedQueueLength);
-  }
-
   void assertScheduler(int numThreads) {
     Preconditions.assertTrue(scheduler.getNumThreads() == numThreads);
   }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 7426d32..62b7d32 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -25,9 +25,8 @@ import org.apache.ratis.protocol.RaftPeerId;
 
 /** Interface for testing raft client. */
 public interface RaftClientTestUtil {
-  static void assertAsyncRequestSemaphore(
-      RaftClient client, int expectedAvailablePermits, int 
expectedQueueLength) {
-    ((RaftClientImpl) 
client).assertAsyncRequestSemaphore(expectedAvailablePermits, 
expectedQueueLength);
+  static void assertAsyncRequestSemaphore(RaftClient client, int 
expectedAvailablePermits, int expectedQueueLength) {
+    ((RaftClientImpl) 
client).getOrderedAsync().assertRequestSemaphore(expectedAvailablePermits, 
expectedQueueLength);
   }
 
   static void assertScheduler(RaftClient client, int numThreads){
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 7637e8d..d248298 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -75,15 +75,15 @@ public interface UnorderedAsync {
       try {
         LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount, 
reply);
         final RaftException replyException = reply != null? 
reply.getException(): null;
-        reply = client.handleNotLeaderException(request, reply, false);
+        reply = client.handleNotLeaderException(request, reply, null);
         if (reply != null) {
           f.complete(reply);
           return;
         }
         final RetryPolicy retryPolicy = client.getRetryPolicy();
         if (!retryPolicy.shouldRetry(attemptCount, request)) {
-          f.completeExceptionally(RaftClientImpl.noMoreRetries(
-              request, attemptCount, retryPolicy, replyException != null? 
replyException: e));
+          f.completeExceptionally(
+              client.noMoreRetries(request, attemptCount, replyException != 
null? replyException: e));
           return;
         }
 
@@ -97,12 +97,12 @@ public interface UnorderedAsync {
 
           if (e instanceof IOException) {
             if (e instanceof NotLeaderException) {
-              client.handleNotLeaderException(request, (NotLeaderException) e, 
false);
+              client.handleNotLeaderException(request, (NotLeaderException) e, 
null);
             } else if (e instanceof GroupMismatchException) {
               f.completeExceptionally(e);
               return;
             } else {
-              client.handleIOException(request, (IOException) e, null, false);
+              client.handleIOException(request, (IOException) e);
             }
           } else {
             if (!client.getClientRpc().handleException(request.getServerId(), 
e, false)) {

Reply via email to