This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a7fac9  RATIS-571. Client may send first request in sliding window 
with firstFlag as false. Contributed by Lokesh Jain.
8a7fac9 is described below

commit 8a7fac921405baf3bfff0fc4fe41aadec2c5abeb
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Jun 4 14:20:24 2019 +0530

    RATIS-571. Client may send first request in sliding window with firstFlag 
as false. Contributed by Lokesh Jain.
---
 .../org/apache/ratis/client/RaftClientRpc.java     | 11 +++++++-
 .../org/apache/ratis/client/impl/OrderedAsync.java | 29 +++++++++++++++++-----
 .../apache/ratis/client/impl/RaftClientImpl.java   | 28 ++++++++++++++++-----
 .../java/org/apache/ratis/util/SlidingWindow.java  | 25 +++++++++++--------
 .../grpc/client/GrpcClientProtocolClient.java      | 13 +++-------
 .../apache/ratis/grpc/client/GrpcClientRpc.java    | 13 +++-------
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  |  1 +
 7 files changed, 79 insertions(+), 41 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index abdfd41..395dc59 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -47,11 +47,20 @@ public interface RaftClientRpc extends Closeable {
   void addServers(Iterable<RaftPeer> servers);
 
   /**
-   * Handle the given throwable.  For example, try reconnecting.
+   * Handle the given throwable. For example, try reconnecting.
    *
    * @return true if the given throwable is handled; otherwise, the call is an 
no-op, return false.
    */
   default boolean handleException(RaftPeerId serverId, Throwable t, boolean 
reconnect) {
     return false;
   }
+
+  /**
+   * Determine if the given throwable should be handled. For example, try 
reconnecting.
+   *
+   * @return true if the given throwable should be handled; otherwise, return 
false.
+   */
+  default boolean shouldReconnect(Throwable t) {
+    return false;
+  }
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 4b5991c..7cb7813 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftException;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -57,6 +58,7 @@ class OrderedAsync {
     private final Function<SlidingWindowEntry, RaftClientRequest> 
requestConstructor;
     private final long seqNum;
     private volatile boolean isFirst = false;
+    private volatile RaftClientRequest request;
 
     PendingOrderedRequest(long seqNum, Function<SlidingWindowEntry, 
RaftClientRequest> requestConstructor) {
       this.seqNum = seqNum;
@@ -65,7 +67,12 @@ class OrderedAsync {
 
     @Override
     RaftClientRequest newRequestImpl() {
-      return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, 
isFirst));
+      request = 
requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst));
+      return request;
+    }
+
+    RaftClientRequest getRequest() {
+      return request;
     }
 
     @Override
@@ -158,13 +165,13 @@ class OrderedAsync {
       return;
     }
 
-    final RaftClientRequest request = pending.newRequest();
-    sendRequest(request, pending.getAttemptCount()).thenAccept(reply -> {
+    sendRequest(pending).thenAccept(reply -> {
       if (f.isDone()) {
         return;
       }
       if (reply == null) {
         final int attempt = pending.getAttemptCount();
+        RaftClientRequest request = pending.getRequest();
         LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
         client.getScheduler().onTimeout(retryPolicy.getSleepTime(attempt, 
request),
             () -> getSlidingWindow(request).retry(pending, 
this::sendRequestWithRetry),
@@ -175,10 +182,20 @@ class OrderedAsync {
     
}).exceptionally(FunctionUtils.consumerAsNullFunction(f::completeExceptionally));
   }
 
-  private CompletableFuture<RaftClientReply> sendRequest(RaftClientRequest 
request, int attemptCount) {
+  private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest 
pending) {
     final RetryPolicy retryPolicy = client.getRetryPolicy();
-    LOG.debug("{}: send* {}", client.getId(), request);
-    return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+    final CompletableFuture<RaftClientReply> f;
+    final RaftClientRequest request;
+    try(AutoCloseableLock readLock = client.readLock()) {
+      if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
+        pending.setFirstRequest();
+      }
+      request = pending.newRequest();
+      LOG.debug("{}: send* {}", client.getId(), request);
+      f = client.getClientRpc().sendRequestAsync(request);
+    }
+    int attemptCount = pending.getAttemptCount();
+    return f.thenApply(reply -> {
       LOG.debug("{}: receive* {}", client.getId(), reply);
       final RaftException replyException = reply != null? 
reply.getException(): null;
       reply = client.handleNotLeaderException(request, reply, 
this::resetSlidingWindow);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index a31da14..a280b15 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -26,6 +26,7 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -41,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -83,6 +85,7 @@ final class RaftClientImpl implements RaftClient {
   private volatile RaftPeerId leaderId;
 
   private final TimeoutScheduler scheduler;
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
   private final Supplier<OrderedAsync> orderedAsync;
 
@@ -353,20 +356,33 @@ final class RaftClientImpl implements RaftClient {
     }
 
     final RaftPeerId oldLeader = request.getServerId();
-    final RaftPeerId curLeader = request.getServerId();
+    final RaftPeerId curLeader = leaderId;
     final boolean stillLeader = oldLeader.equals(curLeader);
     if (newLeader == null && stillLeader) {
       newLeader = CollectionUtils.random(oldLeader,
           CollectionUtils.as(peers, RaftPeer::getId));
     }
-    LOG.debug("{}: oldLeader={},  curLeader={}, newLeader{}", clientId, 
oldLeader, curLeader, newLeader);
+    LOG.debug("{}: oldLeader={},  curLeader={}, newLeader={}", clientId, 
oldLeader, curLeader, newLeader);
 
     final boolean changeLeader = newLeader != null && stillLeader;
-    if (changeLeader) {
-      LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, 
newLeader);
-      this.leaderId = newLeader;
+    final boolean reconnect = changeLeader || clientRpc.shouldReconnect(ioe);
+    if (reconnect) {
+      try(AutoCloseableLock writeLock = writeLock()) {
+        if (changeLeader && oldLeader.equals(leaderId)) {
+          LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, 
newLeader);
+          this.leaderId = newLeader;
+        }
+        clientRpc.handleException(oldLeader, ioe, reconnect);
+      }
     }
-    clientRpc.handleException(oldLeader, ioe, changeLeader);
+  }
+
+  AutoCloseableLock readLock() {
+    return AutoCloseableLock.acquire(lock.readLock());
+  }
+
+  private AutoCloseableLock writeLock() {
+    return AutoCloseableLock.acquire(lock.writeLock());
   }
 
   void assertScheduler(int numThreads) {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java 
b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index b572832..7ce83d9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
 import java.util.Iterator;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
@@ -64,7 +65,7 @@ public interface SlidingWindow {
     static boolean LOG_REPEATEDLY = false;
     private final Object name;
     /** Request map: seqNum -> request */
-    private final SortedMap<Long, REQUEST> requests = new TreeMap<>();
+    private final SortedMap<Long, REQUEST> requests = new 
ConcurrentSkipListMap<>();
 
     RequestMap(Object name) {
       this.name = name;
@@ -77,11 +78,11 @@ public interface SlidingWindow {
       return name;
     }
 
-    synchronized boolean isEmpty() {
+    boolean isEmpty() {
       return requests.isEmpty();
     }
 
-    private synchronized REQUEST get(long seqNum) {
+    private REQUEST get(long seqNum) {
       return requests.get(seqNum);
     }
 
@@ -106,11 +107,11 @@ public interface SlidingWindow {
       return request;
     }
 
-    synchronized long firstSeqNum() {
+    long firstSeqNum() {
       return requests.firstKey();
     }
 
-    synchronized long lastSeqNum() {
+    long lastSeqNum() {
       return requests.lastKey();
     }
 
@@ -120,7 +121,7 @@ public interface SlidingWindow {
       return requests.values().iterator();
     }
 
-    synchronized void putNewRequest(REQUEST request) {
+    void putNewRequest(REQUEST request) {
       final long seqNum = request.getSeqNum();
       CollectionUtils.putNew(seqNum, request, requests, () -> getName() + 
":requests");
     }
@@ -143,7 +144,7 @@ public interface SlidingWindow {
       return true;
     }
 
-    synchronized void endOfRequests(long nextToProcess, REQUEST end, 
Consumer<REQUEST> replyMethod) {
+    void endOfRequests(long nextToProcess, REQUEST end, Consumer<REQUEST> 
replyMethod) {
       final REQUEST nextToProcessRequest = requests.get(nextToProcess);
       Preconditions.assertNull(nextToProcessRequest,
           () -> "nextToProcessRequest = " + nextToProcessRequest + " != null, 
nextToProcess = " + nextToProcess);
@@ -161,12 +162,12 @@ public interface SlidingWindow {
       putNewRequest(end);
     }
 
-    synchronized void clear() {
+    void clear() {
       LOG.debug("close {}", this);
       requests.clear();
     }
 
-    synchronized void log() {
+    void log() {
       LOG.debug(this.toString());
       for(REQUEST r : requests.values()) {
         LOG.debug("  {}: hasReply? {}", r.getSeqNum(), r.hasReply());
@@ -174,7 +175,7 @@ public interface SlidingWindow {
     }
 
     @Override
-    public synchronized String toString() {
+    public String toString() {
       return getName() + ": requests" + asString(requests);
     }
 
@@ -374,6 +375,10 @@ public interface SlidingWindow {
     private void alreadyClosed(REQUEST request, Throwable e) {
       request.fail(new AlreadyClosedException(requests.getName() + " is 
closed.", e));
     }
+
+    public boolean isFirst(long seqNum) {
+      return seqNum == (firstSeqNum != -1 ? firstSeqNum : 
requests.firstSeqNum());
+    }
   }
 
   /**
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 2ed5df0..e6a2c54 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -180,12 +180,12 @@ public class GrpcClientProtocolClient implements 
Closeable {
 
   AsyncStreamObservers getOrderedStreamObservers() {
     return orderedStreamObservers.updateAndGet(
-        a -> a != null? a : new AsyncStreamObservers(orderedStreamObservers, 
this::ordered));
+        a -> a != null? a : new AsyncStreamObservers(this::ordered));
   }
 
   AsyncStreamObservers getUnorderedAsyncStreamObservers() {
     return unorderedStreamObservers.updateAndGet(
-        a -> a != null? a : new AsyncStreamObservers(unorderedStreamObservers, 
asyncStub::unordered));
+        a -> a != null? a : new AsyncStreamObservers(asyncStub::unordered));
   }
 
   public RaftPeer getTarget() {
@@ -275,12 +275,9 @@ public class GrpcClientProtocolClient implements Closeable 
{
       }
     };
     private final RequestStreamer requestStreamer;
-    private final AtomicReference<AsyncStreamObservers> ref;
 
-    AsyncStreamObservers(AtomicReference<AsyncStreamObservers> ref,
-        Function<StreamObserver<RaftClientReplyProto>, 
StreamObserver<RaftClientRequestProto>> f) {
+    AsyncStreamObservers(Function<StreamObserver<RaftClientReplyProto>, 
StreamObserver<RaftClientRequestProto>> f) {
       this.requestStreamer = new RequestStreamer(f.apply(replyStreamObserver));
-      this.ref = ref;
     }
 
     CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
@@ -290,7 +287,7 @@ public class GrpcClientProtocolClient implements Closeable {
       }
       try {
         if 
(!requestStreamer.onNext(ClientProtoUtils.toRaftClientRequestProto(request))) {
-          throw new AlreadyClosedException(getName() + ": the stream is 
closed.");
+          return JavaUtils.completeExceptionally(new 
AlreadyClosedException(getName() + ": the stream is closed."));
         }
       } catch(Throwable t) {
         handleReplyFuture(request.getCallId(), future -> 
future.completeExceptionally(t));
@@ -317,8 +314,6 @@ public class GrpcClientProtocolClient implements Closeable {
     }
 
     private void completeReplyExceptionally(Throwable t, String event) {
-      ref.compareAndSet(this, null);
-
       final Map<Long, CompletableFuture<RaftClientReply>> map = 
replies.getAndSetNull();
       if (map == null) {
         return;
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index deaaac0..31b34ee 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -163,18 +163,13 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<GrpcClientProtocolClie
   }
 
   @Override
-  public boolean handleException(RaftPeerId serverId, Throwable e, boolean 
reconnect) {
+  public boolean shouldReconnect(Throwable e) {
     final Throwable cause = e.getCause();
     if (e instanceof IOException && cause instanceof StatusRuntimeException) {
-      if (!((StatusRuntimeException) cause).getStatus().isOk()) {
-        reconnect = true;
-      }
+      return !((StatusRuntimeException) cause).getStatus().isOk();
     } else if (e instanceof IllegalArgumentException) {
-      if (e.getMessage().contains("null frame before EOS")) {
-        reconnect = true;
-      }
+      return e.getMessage().contains("null frame before EOS");
     }
-    LOG.debug("{}->{}: reconnect? {}, e={}, cause={}", clientId, serverId, 
reconnect, e, cause);
-    return super.handleException(serverId, e, reconnect);
+    return false;
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 177e0f7..05048b7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -159,6 +159,7 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
 
       // send one more request which should timeout.
       final RaftClientRequest requestTimeout = newRaftClientRequest(client, 
leader.getId(), seqNum.incrementAndGet());
+      rpc.handleException(leader.getId(), new Exception(), true);
       final CompletableFuture<RaftClientReply> f = 
rpc.sendRequestAsync(requestTimeout);
       testFailureCase("request should timeout", f::get,
           ExecutionException.class, TimeoutIOException.class);

Reply via email to