This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b613c1  RATIS-568. Server sliding window should fail out of order 
requests on close. Contributed by Tsz Wo Nicholas Sze.
4b613c1 is described below

commit 4b613c1e2e91e91de7a84dc378a2e67da7fb092b
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed May 29 10:17:54 2019 +0530

    RATIS-568. Server sliding window should fail out of order requests on 
close. Contributed by Tsz Wo Nicholas Sze.
---
 .../apache/ratis/client/impl/RaftClientImpl.java   | 14 ++--
 .../apache/ratis/client/impl/UnorderedAsync.java   |  2 +-
 .../org/apache/ratis/protocol/RaftClientReply.java |  6 ++
 .../java/org/apache/ratis/util/SlidingWindow.java  | 79 ++++++++++++++--------
 .../grpc/client/GrpcClientProtocolService.java     | 38 +++++++----
 5 files changed, 88 insertions(+), 51 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9caf95e..024f6c7 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -206,7 +206,7 @@ final class RaftClientImpl implements RaftClient {
         slidingWindowEntry -> newRaftClientRequest(server, callId, message, 
type, slidingWindowEntry));
     return getSlidingWindow(server).submitNewRequest(constructor, 
this::sendRequestWithRetryAsync
     ).getReplyFuture(
-    ).thenApply(reply -> handleStateMachineException(reply, 
CompletionException::new)
+    ).thenApply(reply -> handleRaftException(reply, CompletionException::new)
     ).whenComplete((r, e) -> asyncRequestSemaphore.release());
   }
 
@@ -415,16 +415,16 @@ final class RaftClientImpl implements RaftClient {
     }
     LOG.debug("{}: receive {}", clientId, reply);
     reply = handleNotLeaderException(request, reply, false);
-    reply = handleStateMachineException(reply, Function.identity());
+    reply = handleRaftException(reply, Function.identity());
     return reply;
   }
 
-  static <E extends Throwable> RaftClientReply handleStateMachineException(
-      RaftClientReply reply, Function<StateMachineException, E> converter) 
throws E {
+  static <E extends Throwable> RaftClientReply handleRaftException(
+      RaftClientReply reply, Function<RaftException, E> converter) throws E {
     if (reply != null) {
-      final StateMachineException sme = reply.getStateMachineException();
-      if (sme != null) {
-        throw converter.apply(sme);
+      final RaftException e = reply.getException();
+      if (e != null) {
+        throw converter.apply(e);
       }
     }
     return reply;
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index d5ceeaf..7637e8d 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -57,7 +57,7 @@ public interface UnorderedAsync {
         () -> client.newRaftClientRequest(null, callId, null, type, null));
     sendRequestWithRetry(pending, client);
     return pending.getReplyFuture()
-        .thenApply(reply -> RaftClientImpl.handleStateMachineException(reply, 
CompletionException::new));
+        .thenApply(reply -> RaftClientImpl.handleRaftException(reply, 
CompletionException::new));
   }
 
   static void sendRequestWithRetry(PendingClientRequest pending, 
RaftClientImpl client) {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 9539806..f06a0e5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -67,6 +67,7 @@ public class RaftClientReply extends RaftClientMessage {
       Preconditions.assertTrue(!success,
           () -> "Inconsistent parameters: success && exception != null: " + 
this);
       Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
+          AlreadyClosedException.class,
           NotLeaderException.class, NotReplicatedException.class, 
StateMachineException.class),
           () -> "Unexpected exception class: " + this);
     }
@@ -130,6 +131,11 @@ public class RaftClientReply extends RaftClientMessage {
     return message;
   }
 
+  /** If this reply has {@link AlreadyClosedException}, return it; otherwise 
return null. */
+  public AlreadyClosedException getAlreadyClosedException() {
+    return JavaUtils.cast(exception, AlreadyClosedException.class);
+  }
+
   /** If this reply has {@link NotLeaderException}, return it; otherwise 
return null. */
   public NotLeaderException getNotLeaderException() {
     return JavaUtils.cast(exception, NotLeaderException.class);
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java 
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 8a3237a..b572832 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -37,18 +37,22 @@ import java.util.function.LongFunction;
 public interface SlidingWindow {
   Logger LOG = LoggerFactory.getLogger(SlidingWindow.class);
 
+  static String getName(Class<?> clazz, Object name) {
+    return SlidingWindow.class.getSimpleName() +  "$" + clazz.getSimpleName() 
+ ":" + name;
+  }
+
   interface Request<REPLY> {
     long getSeqNum();
 
     void setReply(REPLY reply);
 
     boolean hasReply();
+
+    void fail(Throwable e);
   }
 
   interface ClientSideRequest<REPLY> extends Request<REPLY> {
     void setFirstRequest();
-
-    void fail(Throwable e);
   }
 
   interface ServerSideRequest<REPLY> extends Request<REPLY> {
@@ -65,7 +69,7 @@ public interface SlidingWindow {
     RequestMap(Object name) {
       this.name = name;
       if (LOG_REPEATEDLY && LOG.isDebugEnabled()) {
-        JavaUtils.runRepeatedly(() -> log(), 5, 10, TimeUnit.SECONDS);
+        JavaUtils.runRepeatedly(this::log, 5, 10, TimeUnit.SECONDS);
       }
     }
 
@@ -73,10 +77,14 @@ public interface SlidingWindow {
       return name;
     }
 
-    boolean isEmpty() {
+    synchronized boolean isEmpty() {
       return requests.isEmpty();
     }
 
+    private synchronized REQUEST get(long seqNum) {
+      return requests.get(seqNum);
+    }
+
     /**
      * If the request with the given seqNum is non-replied, return it.
      * Otherwise, return null.
@@ -86,7 +94,7 @@ public interface SlidingWindow {
      * (2) it does not has reply.
      */
     REQUEST getNonRepliedRequest(long seqNum, String op) {
-      final REQUEST request = requests.get(seqNum);
+      final REQUEST request = get(seqNum);
       if (request == null) {
         LOG.debug("{}: {}, seq={} not found in {}", getName(), op, seqNum, 
this);
         return null;
@@ -98,11 +106,11 @@ public interface SlidingWindow {
       return request;
     }
 
-    long firstSeqNum() {
+    synchronized long firstSeqNum() {
       return requests.firstKey();
     }
 
-    long lastSeqNum() {
+    synchronized long lastSeqNum() {
       return requests.lastKey();
     }
 
@@ -112,7 +120,7 @@ public interface SlidingWindow {
       return requests.values().iterator();
     }
 
-    void putNewRequest(REQUEST request) {
+    synchronized void putNewRequest(REQUEST request) {
       final long seqNum = request.getSeqNum();
       CollectionUtils.putNew(seqNum, request, requests, () -> getName() + 
":requests");
     }
@@ -123,8 +131,8 @@ public interface SlidingWindow {
      *
      * @return true iff this method does set the reply for the request.
      */
-    boolean setReply(long seqNum, REPLY reply, String op) {
-      final REQUEST request = getNonRepliedRequest(seqNum, op);
+    boolean setReply(long seqNum, REPLY reply) {
+      final REQUEST request = getNonRepliedRequest(seqNum, "setReply");
       if (request == null) {
         LOG.debug("{}: DUPLICATED reply {} for seq={} in {}", getName(), 
reply, seqNum, this);
         return false;
@@ -135,6 +143,24 @@ public interface SlidingWindow {
       return true;
     }
 
+    synchronized void endOfRequests(long nextToProcess, REQUEST end, 
Consumer<REQUEST> replyMethod) {
+      final REQUEST nextToProcessRequest = requests.get(nextToProcess);
+      Preconditions.assertNull(nextToProcessRequest,
+          () -> "nextToProcessRequest = " + nextToProcessRequest + " != null, 
nextToProcess = " + nextToProcess);
+
+      final SortedMap<Long, REQUEST> tail = requests.tailMap(nextToProcess);
+      for (REQUEST r : tail.values()) {
+        final AlreadyClosedException e = new AlreadyClosedException(
+            getName() + " is closing: seq = " + r.getSeqNum() + " > 
nextToProcess = " + nextToProcess
+                + " will NEVER be processed; request = " + r);
+        r.fail(e);
+        replyMethod.accept(r);
+      }
+      tail.clear();
+
+      putNewRequest(end);
+    }
+
     synchronized void clear() {
       LOG.debug("close {}", this);
       requests.clear();
@@ -148,7 +174,7 @@ public interface SlidingWindow {
     }
 
     @Override
-    public String toString() {
+    public synchronized String toString() {
       return getName() + ": requests" + asString(requests);
     }
 
@@ -184,7 +210,7 @@ public interface SlidingWindow {
     private Throwable exception;
 
     public Client(Object name) {
-      this.requests = new RequestMap<REQUEST, REPLY>(name) {
+      this.requests = new RequestMap<REQUEST, REPLY>(getName(getClass(), 
name)) {
         @Override
         synchronized void log() {
           LOG.debug(toString());
@@ -284,7 +310,7 @@ public interface SlidingWindow {
      */
     public synchronized void receiveReply(
         long seqNum, REPLY reply, Consumer<REQUEST> sendMethod) {
-      if (!requests.setReply(seqNum, reply, "receiveReply")) {
+      if (!requests.setReply(seqNum, reply)) {
         return; // request already replied
       }
       if (seqNum == firstSeqNum) {
@@ -346,8 +372,7 @@ public interface SlidingWindow {
     }
 
     private void alreadyClosed(REQUEST request, Throwable e) {
-      request.fail(new 
AlreadyClosedException(SlidingWindow.class.getSimpleName() + "$" + 
getClass().getSimpleName()
-          + " " + requests.getName() + " is closed.", e));
+      request.fail(new AlreadyClosedException(requests.getName() + " is 
closed.", e));
     }
   }
 
@@ -368,7 +393,7 @@ public interface SlidingWindow {
     private long nextToProcess = -1;
 
     public Server(Object name, REQUEST end) {
-      this.requests = new RequestMap<>(name);
+      this.requests = new RequestMap<>(getName(getClass(), name));
       this.end = end;
       Preconditions.assertTrue(end.getSeqNum() == Long.MAX_VALUE);
     }
@@ -404,20 +429,16 @@ public interface SlidingWindow {
 
     /**
      * Receives a reply for the given seqNum (may out-of-order) from the 
processor.
-     * It may trigger sending replies to client or processing more requests.
+     * It may trigger sending replies to client.
      */
-    public synchronized void receiveReply(
-        long seqNum, REPLY reply, Consumer<REQUEST> replyMethod, 
Consumer<REQUEST> processingMethod) {
-      if (!requests.setReply(seqNum, reply, "receiveReply")) {
+    public synchronized void receiveReply(long seqNum, REPLY reply, 
Consumer<REQUEST> replyMethod) {
+      if (!requests.setReply(seqNum, reply)) {
         return; // request already replied
       }
       sendRepliesFromHead(replyMethod);
-      processRequestsFromHead(processingMethod);
     }
 
-    private void sendRepliesFromHead(
-        Consumer<REQUEST> replyMethod
-    ) {
+    private void sendRepliesFromHead(Consumer<REQUEST> replyMethod) {
       for(final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); 
i.remove()) {
         final REQUEST r = i.next();
         if (!r.hasReply()) {
@@ -434,14 +455,14 @@ public interface SlidingWindow {
      * Signal the end of requests.
      * @return true if no more outstanding requests.
      */
-    public synchronized boolean endOfRequests() {
+    public synchronized boolean endOfRequests(Consumer<REQUEST> replyMethod) {
       if (requests.isEmpty()) {
         return true;
-      } else {
-        LOG.debug("{}: put end-of-request in {}", requests.getName(), this);
-        requests.putNewRequest(end);
-        return false;
       }
+
+      LOG.debug("{}: put end-of-request in {}", requests.getName(), this);
+      requests.endOfRequests(nextToProcess, end, replyMethod);
+      return false;
     }
 
     @Override
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index b0a7578..9c2fc0c 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -48,24 +49,31 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
 
   private static class PendingOrderedRequest implements 
SlidingWindow.ServerSideRequest<RaftClientReply> {
     private final RaftClientRequest request;
-    private volatile RaftClientReply reply;
+    private final AtomicReference<RaftClientReply> reply = new 
AtomicReference<>();
 
     PendingOrderedRequest(RaftClientRequest request) {
       this.request = request;
     }
 
     @Override
+    public void fail(Throwable t) {
+      Preconditions.assertTrue(t instanceof RaftException, () -> "Requires 
RaftException but " + t);
+      setReply(new RaftClientReply(request, (RaftException) t, null));
+    }
+
+    @Override
     public boolean hasReply() {
-      return reply != null || this == COMPLETED;
+      return getReply() != null || this == COMPLETED;
     }
 
     @Override
-    public void setReply(RaftClientReply reply) {
-      this.reply = reply;
+    public void setReply(RaftClientReply r) {
+      final boolean set = reply.compareAndSet(null, r);
+      Preconditions.assertTrue(set, () -> "Reply is already set: request=" + 
request + ", reply=" + reply);
     }
 
     RaftClientReply getReply() {
-      return reply;
+      return reply.get();
     }
 
     RaftClientRequest getRequest() {
@@ -105,7 +113,7 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
       // Okay if an existing object is removed by another mean during the 
iteration since it must be already closed.
       // Also okay if a new object is added during the iteration since this 
method closes only the existing objects.
       for(OrderedRequestStreamObserver so : map.values()) {
-        so.close();
+        so.close(true);
       }
     }
   }
@@ -304,7 +312,7 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
     void processClientRequest(PendingOrderedRequest pending) {
       final long seq = pending.getSeqNum();
       processClientRequest(pending.getRequest(),
-          reply -> slidingWindow.receiveReply(seq, reply, this::sendReply, 
this::processClientRequest));
+          reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
     }
 
     @Override
@@ -315,7 +323,7 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
     private void sendReply(PendingOrderedRequest ready) {
       Preconditions.assertTrue(ready.hasReply());
       if (ready == COMPLETED) {
-        close();
+        close(true);
       } else {
         LOG.debug("{}: sendReply seq={}, {}", getName(), ready.getSeqNum(), 
ready.getReply());
         
responseNext(ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
@@ -326,20 +334,22 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
     public void onError(Throwable t) {
       // for now we just log a msg
       GrpcUtil.warn(LOG, () -> getName() + ": onError", t);
-      slidingWindow.close();
+      close(false);
     }
 
     @Override
     public void onCompleted() {
-      if (slidingWindow.endOfRequests()) {
-        close();
+      if (slidingWindow.endOfRequests(this::sendReply)) {
+        close(true);
       }
     }
 
-    private void close() {
+    private void close(boolean complete) {
       if (setClose()) {
         LOG.debug("{}: close", getName());
-        responseCompleted();
+        if (complete) {
+          responseCompleted();
+        }
         slidingWindow.close();
         orderedStreamObservers.removeExisting(this);
       }
@@ -348,7 +358,7 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
     @Override
     boolean responseError(Throwable t, Supplier<String> message) {
       if (super.responseError(t, message)) {
-        slidingWindow.close();
+        close(false);
         return true;
       }
       return false;

Reply via email to