Repository: incubator-ratis
Updated Branches:
  refs/heads/master 564e89ee4 -> 86e744c1f


RATIS-113. Add Async send interface to RaftClient. Contributed by Lokesh Jain


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/86e744c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/86e744c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/86e744c1

Branch: refs/heads/master
Commit: 86e744c1f1d50a2b3530dfe568def4eb33aa6e05
Parents: 564e89e
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Tue Nov 14 13:28:21 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Tue Nov 14 13:28:21 2017 -0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     | 11 +++
 .../org/apache/ratis/client/RaftClientRpc.java  | 20 ++++-
 .../ratis/client/impl/ClientImplUtils.java      |  3 -
 .../ratis/client/impl/RaftClientImpl.java       | 65 +++++++++++++-
 .../apache/ratis/protocol/RaftClientReply.java  |  8 ++
 .../apache/ratis/grpc/client/GrpcClientRpc.java | 91 ++++++++++++--------
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |  5 ++
 .../java/org/apache/ratis/RaftBasicTests.java   | 32 ++++++-
 .../java/org/apache/ratis/RaftTestUtil.java     | 11 ++-
 .../simulation/TestRaftWithSimulatedRpc.java    |  6 ++
 10 files changed, 204 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 6e1e5c1..4b152cb 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 
 /** A client who sends requests to a raft service. */
 public interface RaftClient extends Closeable {
@@ -42,6 +43,16 @@ public interface RaftClient extends Closeable {
   RaftClientRpc getClientRpc();
 
   /**
+   * Async call to send the given message to the raft service.
+   * The message may change the state of the service.
+   * For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead.
+   */
+  CompletableFuture<RaftClientReply> sendAsync(Message message);
+
+  /** Async call to send the given readonly message to the raft service. */
+  CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message);
+
+  /**
    * Send the given message to the raft service.
    * The message may change the state of the service.
    * For readonly messages, use {@link #sendReadOnly(Message)} instead.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 90e9570..310f9df 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -17,16 +17,30 @@
  */
 package org.apache.ratis.client;
 
-import java.io.Closeable;
-import java.io.IOException;
-
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
 /** The client side rpc of a raft service. */
 public interface RaftClientRpc extends Closeable {
+  /** Async call to send a request. */
+  default CompletableFuture<RaftClientReply> sendRequestAsync(
+      RaftClientRequest request) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        return sendRequest(request);
+      } catch (Exception e) {
+        throw new CompletionException(e);
+      }
+    });
+  }
+
   /** Send a request. */
   RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 07b07b0..2ae2f35 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -22,11 +22,8 @@ import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 
-import java.util.Collection;
-
 /** Client utilities for internal use. */
 public class ClientImplUtils {
   public static RaftClient newRaftClient(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 8a0ddef..ea2a3bc 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -19,7 +19,6 @@ package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
-import org.apache.ratis.shaded.com.google.common.base.Predicates;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -28,9 +27,9 @@ import org.apache.ratis.protocol.*;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /** A client who sends requests to a raft service. */
@@ -49,6 +48,8 @@ final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
+  private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(3);
+
   RaftClientImpl(ClientId clientId, RaftGroup group,
       RaftPeerId leaderId, RaftClientRpc clientRpc,
       TimeDuration retryInterval) {
@@ -69,6 +70,30 @@ final class RaftClientImpl implements RaftClient {
   }
 
   @Override
+  public CompletableFuture<RaftClientReply> sendAsync(Message message) {
+    return sendAsync(message, false);
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) 
{
+    return sendAsync(message, true);
+  }
+
+  private CompletableFuture<RaftClientReply> sendAsync(Message message,
+      boolean readOnly) {
+    Objects.requireNonNull(message, "message == null");
+    final long callId = nextCallId();
+    return sendRequestWithRetryAsync(
+        () -> new RaftClientRequest(clientId, leaderId, groupId, callId, 
message, readOnly)
+    ).thenApplyAsync(reply -> {
+      if (reply.hasStateMachineException() || 
reply.hasGroupMismatchException()) {
+        throw new CompletionException(reply.getException());
+      }
+      return reply;
+    });
+  }
+
+  @Override
   public RaftClientReply send(Message message) throws IOException {
     return send(message, false);
   }
@@ -124,6 +149,21 @@ final class RaftClientImpl implements RaftClient {
         peersInNewConf.filter(p -> !peers.contains(p))::iterator);
   }
 
+  private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync(
+      Supplier<RaftClientRequest> supplier) {
+    return sendRequestAsync(supplier.get()).thenComposeAsync(reply -> {
+      final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
+      if (reply == null) {
+        final TimeUnit unit = retryInterval.getUnit();
+        scheduler.schedule(() -> sendRequestWithRetryAsync(supplier)
+            .thenApply(r -> f.complete(r)), retryInterval.toLong(unit), unit);
+      } else {
+        f.complete(reply);
+      }
+      return f;
+    });
+  }
+
   private RaftClientReply sendRequestWithRetry(
       Supplier<RaftClientRequest> supplier)
       throws InterruptedIOException, StateMachineException, 
GroupMismatchException {
@@ -145,6 +185,27 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
+  private CompletableFuture<RaftClientReply> sendRequestAsync(
+      RaftClientRequest request) {
+    LOG.debug("{}: sendAsync {}", clientId, request);
+    return clientRpc.sendRequestAsync(request).thenApplyAsync(reply -> {
+      LOG.debug("{}: receive {}", clientId, reply);
+      if (reply != null && reply.isNotLeader()) {
+        handleNotLeaderException(request, reply.getNotLeaderException());
+        return null;
+      }
+      return reply;
+    }).exceptionally(e -> {
+      final Throwable cause = e.getCause();
+      if (cause instanceof RaftException) {
+        return new RaftClientReply(request, (RaftException) cause);
+      } else if (cause instanceof IOException) {
+        handleIOException(request, (IOException) cause, null);
+      }
+      return null;
+    });
+  }
+
   private RaftClientReply sendRequest(RaftClientRequest request)
       throws StateMachineException, GroupMismatchException {
     LOG.debug("{}: send {}", clientId, request);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 60dc6c1..ea59352 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -93,4 +93,12 @@ public class RaftClientReply extends RaftClientMessage {
   public boolean hasStateMachineException() {
     return exception instanceof StateMachineException;
   }
+
+  public boolean hasGroupMismatchException(){
+    return exception instanceof GroupMismatchException;
+  }
+
+  public RaftException getException(){
+    return exception;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 3084289..2b7de70 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -51,6 +51,20 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<RaftClientProtocolClie
   }
 
   @Override
+  public CompletableFuture<RaftClientReply> sendRequestAsync(
+      RaftClientRequest request) {
+    final RaftPeerId serverId = request.getServerId();
+    try {
+      return sendRequestAsync(request, getProxies().getProxy(serverId));
+    } catch (IOException e) {
+      final CompletableFuture<RaftClientReply> replyFuture =
+          new CompletableFuture<>();
+      replyFuture.completeExceptionally(e);
+      return replyFuture;
+    }
+  }
+
+  @Override
   public RaftClientReply sendRequest(RaftClientRequest request)
       throws IOException {
     final RaftPeerId serverId = request.getServerId();
@@ -74,42 +88,11 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<RaftClientProtocolClie
         throw new IOException("msg size:" + requestProto.getSerializedSize() +
             " exceeds maximum:" + maxMessageSize);
       }
-      CompletableFuture<RaftClientReplyProto> replyFuture =
-          new CompletableFuture<>();
-      final StreamObserver<RaftClientRequestProto> requestObserver =
-          proxy.append(new StreamObserver<RaftClientReplyProto>() {
-            @Override
-            public void onNext(RaftClientReplyProto value) {
-              replyFuture.complete(value);
-            }
-
-            @Override
-            public void onError(Throwable t) {
-              // This implementation is used as RaftClientRpc. Retry
-              // logic on Exception is in RaftClient.
-              final IOException e;
-              if (t instanceof StatusRuntimeException) {
-                e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
-              } else {
-                e = IOUtils.asIOException(t);
-              }
-              replyFuture.completeExceptionally(e);
-            }
-
-            @Override
-            public void onCompleted() {
-              if (!replyFuture.isDone()) {
-                replyFuture.completeExceptionally(
-                    new IOException("No reply for request " + request));
-              }
-            }
-          });
-      requestObserver.onNext(requestProto);
-      requestObserver.onCompleted();
-
+      final CompletableFuture<RaftClientReply> replyFuture =
+                     sendRequestAsync(request, proxy);
       // TODO: timeout support
       try {
-        return toRaftClientReply(replyFuture.get());
+        return replyFuture.get();
       } catch (InterruptedException e) {
         throw new InterruptedIOException(
             "Interrupted while waiting for response of request " + request);
@@ -118,4 +101,44 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<RaftClientProtocolClie
       }
     }
   }
+
+  private CompletableFuture<RaftClientReply> sendRequestAsync(
+      RaftClientRequest request, RaftClientProtocolClient proxy) {
+    final RaftClientRequestProto requestProto =
+        toRaftClientRequestProto(request);
+    final CompletableFuture<RaftClientReplyProto> replyFuture =
+        new CompletableFuture<>();
+    final StreamObserver<RaftClientRequestProto> requestObserver =
+        proxy.append(new StreamObserver<RaftClientReplyProto>() {
+          @Override
+          public void onNext(RaftClientReplyProto value) {
+            replyFuture.complete(value);
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            // This implementation is used as RaftClientRpc. Retry
+            // logic on Exception is in RaftClient.
+            final IOException e;
+            if (t instanceof StatusRuntimeException) {
+              e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
+            } else {
+              e = IOUtils.asIOException(t);
+            }
+            replyFuture.completeExceptionally(e);
+          }
+
+          @Override
+          public void onCompleted() {
+            if (!replyFuture.isDone()) {
+              replyFuture.completeExceptionally(
+                  new IOException("No reply for request " + request));
+            }
+          }
+        });
+    requestObserver.onNext(requestProto);
+    requestObserver.onCompleted();
+
+    return replyFuture.thenApply(replyProto -> toRaftClientReply(replyProto));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 76a64b3..2657bd1 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -56,6 +56,11 @@ public class TestRaftWithGrpc extends RaftBasicTests {
     BlockRequestHandlingInjection.getInstance().unblockAll();
   }
 
+  @Test
+  public void testBasicAppendEntriesAsync() throws Exception {
+    super.testBasicAppendEntries(true);
+  }
+
   @Override
   @Test
   public void testWithLoad() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 2647d8f..89c40d0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -47,6 +47,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -112,6 +113,10 @@ public abstract class RaftBasicTests extends BaseTest {
 
   @Test
   public void testBasicAppendEntries() throws Exception {
+    testBasicAppendEntries(false);
+  }
+
+  protected void testBasicAppendEntries(boolean async) throws Exception {
     LOG.info("Running testBasicAppendEntries");
     final MiniRaftCluster cluster = getCluster();
     RaftServerImpl leader = waitForLeader(cluster);
@@ -121,9 +126,28 @@ public abstract class RaftBasicTests extends BaseTest {
     LOG.info(cluster.printServers());
 
     final SimpleMessage[] messages = SimpleMessage.create(10);
-    try(final RaftClient client = cluster.createClient()) {
+
+    try (final RaftClient client = cluster.createClient()) {
+      final AtomicInteger asyncReplyCount = new AtomicInteger();
+      final CompletableFuture<Void> f = new CompletableFuture<>();
+
       for (SimpleMessage message : messages) {
-        client.send(message);
+        if (async) {
+          client.sendAsync(message).thenAcceptAsync(reply -> {
+            if (!reply.isSuccess()) {
+              f.completeExceptionally(
+                  new AssertionError("Failed with reply " + reply));
+            } else if (asyncReplyCount.incrementAndGet() == messages.length) {
+              f.complete(null);
+            }
+          });
+        } else {
+          client.send(message);
+        }
+      }
+      if (async) {
+        f.join();
+        Assert.assertEquals(messages.length, asyncReplyCount.get());
       }
     }
 
@@ -131,7 +155,7 @@ public abstract class RaftBasicTests extends BaseTest {
     LOG.info(cluster.printAllLogs());
 
     cluster.getServerAliveStream().map(s -> s.getState().getLog())
-        .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
+        .forEach(log -> RaftTestUtil.assertLogEntries(log, async, term, 
messages));
   }
 
   @Test
@@ -171,7 +195,7 @@ public abstract class RaftBasicTests extends BaseTest {
     Assert.assertEquals(followerToSendLog.getId(), newLeaderId);
 
     cluster.getServerAliveStream().map(s -> s.getState().getLog())
-        .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
+        .forEach(log -> RaftTestUtil.assertLogEntries(log, false, term, 
messages));
     LOG.info("terminating testOldLeaderCommit test");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index d1e614c..c8dfc0d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -165,7 +165,7 @@ public interface RaftTestUtil {
     }
   }
 
-  static void assertLogEntries(RaftLog log, long expectedTerm,
+  static void assertLogEntries(RaftLog log, boolean async, long expectedTerm,
       SimpleMessage... expectedMessages) {
 
     final TermIndex[] termIndices = log.getEntries(1, Long.MAX_VALUE);
@@ -189,6 +189,11 @@ public interface RaftTestUtil {
       }
     }
 
+    if (async) {
+      Collections.sort(entries, Comparator
+          .comparing(e -> e.getSmLogEntry().getData().toStringUtf8()));
+    }
+
     long logIndex = 0;
     Assert.assertEquals(expectedMessages.length, entries.size());
     for (int i = 0; i < expectedMessages.length; i++) {
@@ -197,7 +202,9 @@ public interface RaftTestUtil {
       if (e.getTerm() > expectedTerm) {
         expectedTerm = e.getTerm();
       }
-      Assert.assertTrue(e.getIndex() > logIndex);
+      if (!async) {
+        Assert.assertTrue(e.getIndex() > logIndex);
+      }
       logIndex = e.getIndex();
       Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
           e.getSmLogEntry().getData().toByteArray());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/86e744c1/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
index c1136b7..5b7f13e 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestRaftWithSimulatedRpc.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftBasicTests;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.util.LogUtils;
+import org.junit.Test;
 
 import java.io.IOException;
 
@@ -42,4 +43,9 @@ public class TestRaftWithSimulatedRpc extends RaftBasicTests {
   public MiniRaftClusterWithSimulatedRpc getCluster() {
     return cluster;
   }
+
+  @Test
+  public void testBasicAppendEntriesAsync() throws Exception {
+    super.testBasicAppendEntries(true);
+  }
 }

Reply via email to