This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c1638d  RATIS-704. Invoke sendAsync as soon as OrderedAsync is 
created. Contributed by Tsz Wo Nicholas Sze.
9c1638d is described below

commit 9c1638dbe6ef31246d780331b0cc8d27a8dc0c60
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Oct 24 12:48:08 2019 +0530

    RATIS-704. Invoke sendAsync as soon as OrderedAsync is created. Contributed 
by Tsz Wo Nicholas Sze.
---
 .../apache/ratis/client/RaftClientConfigKeys.java  | 15 ++++++++++++-
 .../org/apache/ratis/client/impl/OrderedAsync.java | 26 ++++++++++++++++------
 .../apache/ratis/client/impl/RaftClientImpl.java   |  2 +-
 .../apache/ratis/protocol/RaftClientRequest.java   |  5 +++++
 .../java/org/apache/ratis/util/PeerProxyMap.java   |  1 +
 .../apache/ratis/grpc/client/GrpcClientRpc.java    |  2 +-
 .../org/apache/ratis/RaftAsyncExceptionTests.java  | 20 ++++++++++-------
 .../test/java/org/apache/ratis/RaftAsyncTests.java | 14 +++++++++---
 .../test/java/org/apache/ratis/RaftBasicTests.java |  5 +++--
 9 files changed, 67 insertions(+), 23 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index bb01249..d2c3679 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -62,6 +62,19 @@ public interface RaftClientConfigKeys {
     static void setMaxOutstandingRequests(RaftProperties properties, int 
outstandingRequests) {
       setInt(properties::setInt, MAX_OUTSTANDING_REQUESTS_KEY, 
outstandingRequests);
     }
+
+    interface Experimental {
+      String PREFIX = Async.PREFIX + "." + 
Experimental.class.getSimpleName().toLowerCase();
+
+      String SEND_DUMMY_REQUEST_KEY = PREFIX + ".send-dummy-request";
+      boolean SEND_DUMMY_REQUEST_DEFAULT = true;
+      static boolean sendDummyRequest(RaftProperties properties) {
+        return getBoolean(properties::getBoolean, SEND_DUMMY_REQUEST_KEY, 
SEND_DUMMY_REQUEST_DEFAULT, getDefaultLog());
+      }
+      static void setSendDummyRequest(RaftProperties properties, boolean 
sendDummyRequest) {
+        setBoolean(properties::setBoolean, SEND_DUMMY_REQUEST_KEY, 
sendDummyRequest);
+      }
+    }
   }
 
   static void main(String[] args) {
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 7694450..79ee050 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -53,8 +53,8 @@ import java.util.function.Function;
 import java.util.function.LongFunction;
 
 /** Send ordered asynchronous requests to a raft service. */
-class OrderedAsync {
-  private static final Logger LOG = 
LoggerFactory.getLogger(OrderedAsync.class);
+public final class OrderedAsync {
+  public static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
 
   static class PendingOrderedRequest extends PendingClientRequest
       implements SlidingWindow.ClientSideRequest<RaftClientReply> {
@@ -110,13 +110,23 @@ class OrderedAsync {
     }
   }
 
+  static OrderedAsync newInstance(RaftClientImpl client, RaftProperties 
properties) {
+    final OrderedAsync ordered = new OrderedAsync(client, properties);
+    // send a dummy watch request to establish the connection
+    // TODO: this is a work around, it is better to fix the underlying RPC 
implementation
+    if (RaftClientConfigKeys.Async.Experimental.sendDummyRequest(properties)) {
+      ordered.send(RaftClientRequest.watchRequestType(), null, null);
+    }
+    return ordered;
+  }
+
   private final RaftClientImpl client;
   /** Map: id -> {@link SlidingWindow}, in order to support async calls to the 
Raft service or individual servers. */
   private final ConcurrentMap<String, 
SlidingWindow.Client<PendingOrderedRequest, RaftClientReply>> slidingWindows
       = new ConcurrentHashMap<>();
   private final Semaphore requestSemaphore;
 
-  OrderedAsync(RaftClientImpl client, RaftProperties properties) {
+  private OrderedAsync(RaftClientImpl client, RaftProperties properties) {
     this.client = Objects.requireNonNull(client, "client == null");
     this.requestSemaphore = new 
Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
   }
@@ -170,6 +180,7 @@ class OrderedAsync {
 
     final RaftClientRequest request = pending.newRequestImpl();
     if (request == null) { // already done
+      LOG.debug("{} newRequestImpl returns null", pending);
       return;
     }
 
@@ -198,14 +209,15 @@ class OrderedAsync {
 
   private void scheduleWithTimeout(PendingOrderedRequest pending, 
RaftClientRequest request, RetryPolicy retryPolicy) {
     final int attempt = pending.getAttemptCount();
-    LOG.debug("schedule* attempt #{} with policy {} for {}", attempt, 
retryPolicy, request);
     final TimeDuration sleepTime = retryPolicy.getSleepTime(attempt, request);
-    scheduleWithTimeout(pending, request.getServerId(), sleepTime);
+    LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", 
attempt, sleepTime, retryPolicy, request);
+    scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
   }
 
-  private void scheduleWithTimeout(PendingOrderedRequest pending, RaftPeerId 
serverId, TimeDuration sleepTime) {
+  private void scheduleWithTimeout(PendingOrderedRequest pending, TimeDuration 
sleepTime,
+      SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> 
slidingWindow) {
     client.getScheduler().onTimeout(sleepTime,
-        () -> getSlidingWindow(serverId).retry(pending, 
this::sendRequestWithRetry),
+        () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
         LOG, () -> "Failed* to retry " + pending);
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 582b6b6..e1df86b 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -99,7 +99,7 @@ final class RaftClientImpl implements RaftClient {
     scheduler = TimeoutScheduler.getInstance();
     clientRpc.addServers(peers);
 
-    this.orderedAsync = JavaUtils.memoize(() -> new OrderedAsync(this, 
properties));
+    this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, 
properties));
   }
 
   @Override
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 4c10c0c..a9e5064 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -30,6 +30,8 @@ import static 
org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.
  */
 public class RaftClientRequest extends RaftClientMessage {
   private static final Type WRITE_DEFAULT = new 
Type(WriteRequestTypeProto.getDefaultInstance());
+  private static final Type WATCH_DEFAULT = new Type(
+      
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
 
   private static final Type DEFAULT_READ = new 
Type(ReadRequestTypeProto.getDefaultInstance());
   private static final Type DEFAULT_STALE_READ = new 
Type(StaleReadRequestTypeProto.getDefaultInstance());
@@ -47,6 +49,9 @@ public class RaftClientRequest extends RaftClientMessage {
         : new 
Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
   }
 
+  public static Type watchRequestType() {
+    return WATCH_DEFAULT;
+  }
   public static Type watchRequestType(long index, ReplicationLevel 
replication) {
     return new 
Type(WatchRequestTypeProto.newBuilder().setIndex(index).setReplication(replication).build());
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index ea0c2f1..6327adb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -98,6 +98,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements 
Closeable {
   }
 
   public PROXY getProxy(RaftPeerId id) throws IOException {
+    Objects.requireNonNull(id, "id == null");
     PeerAndProxy p = peers.get(id);
     if (p == null) {
       synchronized (resetLock) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 5883598..1fceb1b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -66,7 +66,7 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<GrpcClientProtocolClie
       final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
       // Reuse the same grpc stream for all async calls.
       return proxy.getOrderedStreamObservers().onNext(request);
-    } catch (IOException e) {
+    } catch (Throwable e) {
       return JavaUtils.completeExceptionally(e);
     }
   }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index cdb0e6b..4e8f68e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -17,9 +17,11 @@
  */
 package org.apache.ratis;
 
-import org.apache.ratis.client.RaftClient;
+import org.apache.log4j.Level;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.OrderedAsync;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -27,7 +29,7 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.LogUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,20 +38,22 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 public abstract class RaftAsyncExceptionTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
 
   {
+    LogUtils.setLogLevel(OrderedAsync.LOG, Level.DEBUG);
     getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
   }
 
   @Test
   public void testGroupMismatchException() throws Exception {
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
false);
     runWithNewCluster(1, this::runTestGroupMismatchException);
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
true);
   }
 
   private void runTestGroupMismatchException(CLUSTER cluster) throws Exception 
{
@@ -95,24 +99,24 @@ public abstract class RaftAsyncExceptionTests<CLUSTER 
extends MiniRaftCluster>
   private void runTestTimeoutException(CLUSTER cluster) throws Exception {
     // send a message to make sure the cluster is working
     try(RaftClient client = cluster.createClient()) {
-      client.send(new SimpleMessage("m0"));
+      final RaftClientReply reply = client.send(new SimpleMessage("m0"));
+      Assert.assertTrue(reply.isSuccess());
 
-      RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(),
-          TimeDuration.valueOf(3, TimeUnit.SECONDS));
+      RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(), ONE_SECOND);
       // Block StartTransaction
       cluster.getServers().stream()
           .map(cluster::getRaftServerImpl)
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::blockStartTransaction);
       final CompletableFuture<RaftClientReply> replyFuture = 
client.sendAsync(new SimpleMessage("m1"));
-      Thread.sleep(10000);
+      FIVE_SECONDS.sleep();
       // Unblock StartTransaction
       cluster.getServers().stream()
           .map(cluster::getRaftServerImpl)
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
       // The request should succeed after start transaction is unblocked
-      Assert.assertTrue(replyFuture.get().isSuccess());
+      Assert.assertTrue(replyFuture.get(FIVE_SECONDS.getDuration(), 
FIVE_SECONDS.getUnit()).isSuccess());
     }
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 4ab5282..8760147 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -105,12 +105,18 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
   @Test
   public void testRequestAsyncWithRetryFailure() throws Exception {
-    runWithNewCluster(1, false, cluster -> 
runTestRequestAsyncWithRetryFailure(false, cluster));
+    runTestRequestAsyncWithRetryFailure(false);
   }
 
   @Test
   public void testRequestAsyncWithRetryFailureAfterInitialMessages() throws 
Exception {
-    runWithNewCluster(1, true, cluster -> 
runTestRequestAsyncWithRetryFailure(true, cluster));
+    runTestRequestAsyncWithRetryFailure(true);
+  }
+
+  void runTestRequestAsyncWithRetryFailure(boolean initialMessages) throws 
Exception {
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
false);
+    runWithNewCluster(1, initialMessages, cluster -> 
runTestRequestAsyncWithRetryFailure(initialMessages, cluster));
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
true);
   }
 
   void runTestRequestAsyncWithRetryFailure(boolean initialMessages, CLUSTER 
cluster) throws Exception {
@@ -345,11 +351,13 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   @Test
   public void testRequestTimeout() throws Exception {
     final TimeDuration oldExpiryTime = 
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
-    RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
TimeDuration.valueOf(5, TimeUnit.SECONDS));
+    RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
FIVE_SECONDS);
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
false);
     runWithNewCluster(NUM_SERVERS, cluster -> 
RaftBasicTests.testRequestTimeout(true, cluster, LOG));
 
     //reset for the other tests
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
oldExpiryTime);
+    
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(getProperties(), 
true);
   }
 
   @Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index a3cab64..f07eb7c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -37,6 +37,7 @@ import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -407,7 +408,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
 
   public static void testRequestTimeout(boolean async, MiniRaftCluster 
cluster, Logger LOG) throws Exception {
     waitForLeader(cluster);
-    long time = System.currentTimeMillis();
+    final Timestamp startTime = Timestamp.currentTime();
     try (final RaftClient client = cluster.createClient()) {
       // Get the next callId to be used by the client
       long callId = RaftClientTestUtil.getCallId(client);
@@ -428,7 +429,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
       // Eventually the request would be accepted by the server
       // when the retry cache entry is invalidated.
       // The duration for which the client waits should be more than the 
retryCacheExpiryDuration.
-      TimeDuration duration = TimeDuration.valueOf(System.currentTimeMillis() 
- time, TimeUnit.MILLISECONDS);
+      final TimeDuration duration = startTime.elapsedTime();
       TimeDuration retryCacheExpiryDuration = 
RaftServerConfigKeys.RetryCache.expiryTime(cluster.getProperties());
       Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
     }

Reply via email to