This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 449a272  RATIS-1094. Move async methods from RaftClient to a new 
AsyncApi interface (#220). Contributed by Rui Wang
449a272 is described below

commit 449a272debc1a3aebea6ecbd40ac6356d23ba855
Author: Rui Wang <[email protected]>
AuthorDate: Mon Oct 12 21:08:17 2020 -0700

    RATIS-1094. Move async methods from RaftClient to a new AsyncApi interface 
(#220). Contributed by Rui Wang
---
 .../java/org/apache/ratis/client/RaftClient.java   | 24 ++--------
 .../java/org/apache/ratis/client/api/AsyncApi.java | 48 +++++++++++++++++++
 .../org/apache/ratis/client/impl/AsyncImpl.java    | 56 ++++++++++++++++++++++
 .../apache/ratis/client/impl/RaftClientImpl.java   | 28 +++--------
 .../apache/ratis/client/impl/RaftOutputStream.java |  4 +-
 .../ratis/examples/filestore/FileStoreClient.java  |  4 +-
 .../org/apache/ratis/RaftAsyncExceptionTests.java  |  6 +--
 .../test/java/org/apache/ratis/RaftAsyncTests.java | 28 +++++------
 .../test/java/org/apache/ratis/RaftBasicTests.java |  8 ++--
 .../apache/ratis/RequestLimitAsyncBaseTest.java    | 10 ++--
 .../java/org/apache/ratis/WatchRequestTests.java   | 17 +++----
 .../apache/ratis/grpc/TestGrpcMessageMetrics.java  |  3 +-
 .../apache/ratis/grpc/TestLogAppenderWithGrpc.java |  2 +-
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 20 ++++----
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java    |  2 +-
 .../ratis/retry/TestExceptionDependentRetry.java   |  4 +-
 16 files changed, 170 insertions(+), 94 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 15549a8..7794817 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.client;
 
 import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.api.AsyncApi;
 import org.apache.ratis.client.api.DataStreamApi;
 import org.apache.ratis.client.api.GroupManagementApi;
 import org.apache.ratis.client.api.MessageStreamApi;
@@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
 
 /** A client who sends requests to a raft service. */
 public interface RaftClient extends Closeable {
@@ -54,6 +54,9 @@ public interface RaftClient extends Closeable {
   /** Get the {@link GroupManagementApi} for the given server. */
   GroupManagementApi getGroupManagementApi(RaftPeerId server);
 
+  /** Get the {@link AsyncApi}. */
+  AsyncApi async();
+
   /** @return the {@link MessageStreamApi}. */
   MessageStreamApi getMessageStreamApi();
 
@@ -65,25 +68,6 @@ public interface RaftClient extends Closeable {
   }
 
   /**
-   * Async call to send the given message to the raft service.
-   * The message may change the state of the service.
-   * For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead.
-   *
-   * @param message The request message.
-   * @return a future of the reply.
-   */
-  CompletableFuture<RaftClientReply> sendAsync(Message message);
-
-  /** Async call to send the given readonly message to the raft service. */
-  CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message);
-
-  /** Async call to send the given stale-read message to the given server (not 
the raft service). */
-  CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long 
minIndex, RaftPeerId server);
-
-  /** Async call to watch the given index to satisfy the given replication 
level. */
-  CompletableFuture<RaftClientReply> sendWatchAsync(long index, 
ReplicationLevel replication);
-
-  /**
    * Send the given message to the raft service.
    * The message may change the state of the service.
    * For readonly messages, use {@link #sendReadOnly(Message)} instead.
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
new file mode 100644
index 0000000..de8320e
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.api;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/**
+ * APIs to support asynchronous operations such as send message, send 
(stale)read message and watch request.
+ */
+public interface AsyncApi {
+  /**
+   * Async call to send the given message to the raft service.
+   * The message may change the state of the service.
+   * For readonly messages, use {@link #sendReadOnly(Message)} instead.
+   *
+   * @param message The request message.
+   * @return a future of the reply.
+   */
+  CompletableFuture<RaftClientReply> send(Message message);
+
+  /** Async call to send the given readonly message to the raft service. */
+  CompletableFuture<RaftClientReply> sendReadOnly(Message message);
+
+  /** Async call to send the given stale-read message to the given server (not 
the raft service). */
+  CompletableFuture<RaftClientReply> sendStaleRead(Message message, long 
minIndex, RaftPeerId server);
+
+  /** Async call to watch the given index to satisfy the given replication 
level. */
+  CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel 
replication);
+}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
new file mode 100644
index 0000000..09c3ae5
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.client.api.AsyncApi;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/** Async api implementations. */
+class AsyncImpl implements AsyncApi {
+  private final RaftClientImpl client;
+
+  AsyncImpl(RaftClientImpl client) {
+    this.client = Objects.requireNonNull(client, "client == null");
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> send(Message message) {
+    return client.sendAsync(RaftClientRequest.writeRequestType(), message, 
null);
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
+    return client.sendAsync(RaftClientRequest.readRequestType(), message, 
null);
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> sendStaleRead(Message message, 
long minIndex, RaftPeerId server) {
+    return client.sendAsync(RaftClientRequest.staleReadRequestType(minIndex), 
message, server);
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel 
replication) {
+    return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, 
replication), client);
+  }
+}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f9f985f..25b4eab 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.client.impl;
 
+import org.apache.ratis.client.api.AsyncApi;
 import org.apache.ratis.client.api.GroupManagementApi;
 import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.RaftClient;
@@ -186,26 +187,6 @@ public final class RaftClientImpl implements RaftClient {
     return streamApi.get();
   }
 
-  @Override
-  public CompletableFuture<RaftClientReply> sendAsync(Message message) {
-    return sendAsync(RaftClientRequest.writeRequestType(), message, null);
-  }
-
-  @Override
-  public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) 
{
-    return sendAsync(RaftClientRequest.readRequestType(), message, null);
-  }
-
-  @Override
-  public CompletableFuture<RaftClientReply> sendStaleReadAsync(Message 
message, long minIndex, RaftPeerId server) {
-    return sendAsync(RaftClientRequest.staleReadRequestType(minIndex), 
message, server);
-  }
-
-  @Override
-  public CompletableFuture<RaftClientReply> sendWatchAsync(long index, 
ReplicationLevel replication) {
-    return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, 
replication), this);
-  }
-
   CompletableFuture<RaftClientReply> streamAsync(long streamId, long 
messageId, Message message, boolean endOfRequest) {
     return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId, 
endOfRequest), message, null);
   }
@@ -214,7 +195,7 @@ public final class RaftClientImpl implements RaftClient {
     return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId, 
true), null, null);
   }
 
-  private CompletableFuture<RaftClientReply> sendAsync(
+  CompletableFuture<RaftClientReply> sendAsync(
       RaftClientRequest.Type type, Message message, RaftPeerId server) {
     return getOrderedAsync().send(type, message, server);
   }
@@ -271,6 +252,11 @@ public final class RaftClientImpl implements RaftClient {
   }
 
   @Override
+  public AsyncApi async() {
+    return new AsyncImpl(this);
+  }
+
+  @Override
   public GroupManagementApi getGroupManagementApi(RaftPeerId server) {
     return new GroupManagementImpl(server, this);
   }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
index 1cdbeac..2286d4c 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
@@ -32,7 +32,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
-/** An {@link OutputStream} implementation using {@link 
RaftClient#sendAsync(Message)} API. */
+/** An {@link OutputStream} implementation using {@link 
org.apache.ratis.client.api.AsyncApi#send(Message)} API. */
 public class RaftOutputStream extends OutputStream {
   private final Supplier<RaftClient> client;
   private final AtomicBoolean closed = new AtomicBoolean();
@@ -87,7 +87,7 @@ public class RaftOutputStream extends OutputStream {
       return;
     }
 
-    final CompletableFuture<Long> f = getClient().sendAsync(
+    final CompletableFuture<Long> f = getClient().async().send(
         Message.valueOf(ProtoUtils.toByteString(buffer, 0, byteCount))
     ).thenApply(reply -> RaftClientImpl.handleRaftException(reply, 
CompletionException::new)
     ).thenApply(reply -> reply != null && reply.isSuccess()? pos: null);
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 639cfdb..f6269bb 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -103,11 +103,11 @@ public class FileStoreClient implements Closeable {
   }
 
   private CompletableFuture<ByteString> sendAsync(ByteString request) {
-    return sendAsync(request, client::sendAsync);
+    return sendAsync(request, client.async()::send);
   }
 
   private CompletableFuture<ByteString> sendReadOnlyAsync(ByteString request) {
-    return sendAsync(request, client::sendReadOnlyAsync);
+    return sendAsync(request, client.async()::sendReadOnly);
   }
 
   public ByteString read(String path, long offset, long length) throws 
IOException {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index 09ddec8..23b63fe 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -59,7 +59,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER extends 
MiniRaftCluster>
   private void runTestGroupMismatchException(CLUSTER cluster) throws Exception 
{
     // send a message to make sure the cluster is working
     try(RaftClient client = cluster.createClient()) {
-      final RaftClientReply reply = client.sendAsync(new 
SimpleMessage("first")).get();
+      final RaftClientReply reply = client.async().send(new 
SimpleMessage("first")).get();
       Assert.assertTrue(reply.isSuccess());
     }
 
@@ -74,7 +74,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER extends 
MiniRaftCluster>
       // send a few messages
       final List<CompletableFuture<RaftClientReply>> futures = new 
ArrayList<>();
       for(SimpleMessage m : messages) {
-        futures.add(client.sendAsync(m));
+        futures.add(client.async().send(m));
       }
       Assert.assertEquals(messages.length, futures.size());
 
@@ -108,7 +108,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER 
extends MiniRaftCluster>
           .map(cluster::getRaftServerImpl)
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::blockStartTransaction);
-      final CompletableFuture<RaftClientReply> replyFuture = 
client.sendAsync(new SimpleMessage("m1"));
+      final CompletableFuture<RaftClientReply> replyFuture = 
client.async().send(new SimpleMessage("m1"));
       FIVE_SECONDS.sleep();
       // Unblock StartTransaction
       cluster.getServers().stream()
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 552f049..d78fbdc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -134,7 +134,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
         final SimpleMessage[] messages = SimpleMessage.create(10, "initial-");
         final List<CompletableFuture<RaftClientReply>> replies = new 
ArrayList<>();
         for (int i = 0; i < messages.length; i++) {
-          replies.add(client.sendAsync(messages[i]));
+          replies.add(client.async().send(messages[i]));
         }
         for (int i = 0; i < messages.length; i++) {
           RaftTestUtil.assertSuccessReply(replies.get(i));
@@ -151,7 +151,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
         int i = 0;
         // send half of the calls without starting the cluster
         for (; i < messages.length/2; i++) {
-          replies.add(client.sendAsync(messages[i]));
+          replies.add(client.async().send(messages[i]));
         }
 
         // sleep most of the retry time
@@ -159,7 +159,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
         // send another half of the calls without starting the cluster
         for (; i < messages.length; i++) {
-          replies.add(client.sendAsync(messages[i]));
+          replies.add(client.async().send(messages[i]));
         }
         Assert.assertEquals(messages.length, replies.size());
       }
@@ -188,7 +188,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
         }
       }
 
-      testFailureCaseAsync("last-request", () -> client.sendAsync(new 
SimpleMessage("last")),
+      testFailureCaseAsync("last-request", () -> client.async().send(new 
SimpleMessage("last")),
           AlreadyClosedException.class, RaftRetryFailureException.class);
     }
   }
@@ -215,14 +215,14 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       AtomicInteger blockedRequestsCount = new AtomicInteger();
       for (int i = 0; i < numMessages; i++) {
         blockedRequestsCount.getAndIncrement();
-        futures[i] = client.sendAsync(messages[i]);
+        futures[i] = client.async().send(messages[i]);
         blockedRequestsCount.decrementAndGet();
       }
       Assert.assertEquals(0, blockedRequestsCount.get());
 
       futures[numMessages] = CompletableFuture.supplyAsync(() -> {
         blockedRequestsCount.incrementAndGet();
-        client.sendAsync(new SimpleMessage("n1"));
+        client.async().send(new SimpleMessage("n1"));
         blockedRequestsCount.decrementAndGet();
         return null;
       });
@@ -284,7 +284,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       for (int i = 0; i < numMesssages; i++) {
         final String s = "" + i;
         LOG.info("sendAsync " + s);
-        futures.add(client.sendAsync(new SimpleMessage(s)));
+        futures.add(client.async().send(new SimpleMessage(s)));
       }
       Assert.assertEquals(numMesssages, futures.size());
       final List<RaftClientReply> replies = new ArrayList<>();
@@ -310,7 +310,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
       // test a failure case
       testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
-          () -> client.sendStaleReadAsync(
+          () -> client.async().sendStaleRead(
               new SimpleMessage("" + Long.MAX_VALUE),
               followerCommitInfo.getCommitIndex(), follower),
           StateMachineException.class, IndexOutOfBoundsException.class);
@@ -321,12 +321,12 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
         final String query = "" + i;
         LOG.info("query=" + query + ", reply=" + reply);
         final Message message = new SimpleMessage(query);
-        final CompletableFuture<RaftClientReply> readFuture = 
client.sendReadOnlyAsync(message);
+        final CompletableFuture<RaftClientReply> readFuture = 
client.async().sendReadOnly(message);
 
         futures.add(readFuture.thenCompose(r -> {
           if (reply.getLogIndex() <= followerCommitIndex) {
             LOG.info("sendStaleReadAsync, query=" + query);
-            return client.sendStaleReadAsync(message, followerCommitIndex, 
follower);
+            return client.async().sendStaleRead(message, followerCommitIndex, 
follower);
           } else {
             return CompletableFuture.completedFuture(null);
           }
@@ -389,7 +389,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
           .map(SimpleStateMachine4Testing::get)
           .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
 
-      CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new 
SimpleMessage("abc"));
+      CompletableFuture<RaftClientReply> replyFuture = client.async().send(new 
SimpleMessage("abc"));
       Thread.sleep(waitTime);
       // replyFuture should not be completed until append request is unblocked.
       Assert.assertFalse(replyFuture.isDone());
@@ -428,7 +428,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
           .forEach(peer -> logSyncDelay.setDelayMs(peer.getId().toString(), 
1000));
 
       // trigger append entries request
-      client.sendAsync(new SimpleMessage("abc"));
+      client.async().send(new SimpleMessage("abc"));
 
       // default max election timeout is 300ms, 1s is long enough to
       // trigger failure of LeaderState::checkLeadership()
@@ -470,7 +470,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
 
     // send a message to make sure that the leader is ready
     try (final RaftClient client = cluster.createClient(leader.getId())) {
-      final CompletableFuture<RaftClientReply> f = client.sendAsync(new 
SimpleMessage("first"));
+      final CompletableFuture<RaftClientReply> f = client.async().send(new 
SimpleMessage("first"));
       FIVE_SECONDS.apply(f::get);
     }
 
@@ -478,7 +478,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     final RetryPolicy r = event -> () -> TimeDuration.valueOf(60, 
TimeUnit.SECONDS);
 
     try (final RaftClient client = 
cluster.createClient(followers.get(0).getId(), cluster.getGroup(), r)) {
-      final CompletableFuture<RaftClientReply> f = client.sendAsync(new 
SimpleMessage("abc"));
+      final CompletableFuture<RaftClientReply> f = client.async().send(new 
SimpleMessage("abc"));
       FIVE_SECONDS.apply(f::get);
     } catch (TimeoutException e) {
       throw new AssertionError("Failed to get async result", e);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index c17004e..7e1e47e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -138,7 +138,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
 
       for (SimpleMessage message : messages) {
         if (async) {
-          client.sendAsync(message).thenAcceptAsync(reply -> {
+          client.async().send(message).thenAcceptAsync(reply -> {
             if (!reply.isSuccess()) {
               f.completeExceptionally(
                   new AssertionError("Failed with reply " + reply));
@@ -292,7 +292,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
             Assert.assertTrue(reply.isSuccess());
           } else {
             final CompletableFuture<RaftClientReply> replyFuture =
-                client.sendAsync(messages[i]);
+                client.async().send(messages[i]);
             replyFuture.thenAcceptAsync(r -> {
               if (!r.isSuccess()) {
                 f.completeExceptionally(
@@ -433,7 +433,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
       // Ideally the client request should timeout and the client should retry.
       // The retry is successful when the retry cache entry for the 
corresponding callId and clientId expires.
       if (async) {
-        CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new 
SimpleMessage("abc"));
+        CompletableFuture<RaftClientReply> replyFuture = 
client.async().send(new SimpleMessage("abc"));
         replyFuture.get();
       } else {
         client.send(new SimpleMessage("abc"));
@@ -464,7 +464,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
       checkFollowerCommitLagsLeader(cluster);
 
       if (async) {
-        CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new 
SimpleMessage("abc"));
+        CompletableFuture<RaftClientReply> replyFuture = 
client.async().send(new SimpleMessage("abc"));
         replyFuture.get();
       } else {
         client.send(new SimpleMessage("abc"));
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
index cd6a312..df7ba15 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
@@ -71,7 +71,7 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER 
extends MiniRaftCluster>
     try (RaftClient c1 = cluster.createClient(leader.getId())) {
       { // send first message to make sure the cluster is working
         final SimpleMessage message = new SimpleMessage("first");
-        final CompletableFuture<RaftClientReply> future = 
c1.sendAsync(message);
+        final CompletableFuture<RaftClientReply> future = 
c1.async().send(message);
         final RaftClientReply reply = getWithDefaultTimeout(future);
         Assert.assertTrue(reply.isSuccess());
       }
@@ -84,13 +84,13 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER 
extends MiniRaftCluster>
       final List<CompletableFuture<RaftClientReply>> writeFutures = new 
ArrayList<>();
       for (int i = 0; i < writeElementLimit; i++) {
         final SimpleMessage message = new SimpleMessage("m" + i);
-        writeFutures.add(c1.sendAsync(message));
+        writeFutures.add(c1.async().send(message));
       }
 
       // send watch requests up to the limit
       final long watchBase = 1000; //watch a large index so that it won't 
complete
       for (int i = 0; i < watchElementLimit; i++) {
-        c1.sendWatchAsync(watchBase + i, ReplicationLevel.ALL);
+        c1.async().watch(watchBase + i, ReplicationLevel.ALL);
       }
 
       // sleep to make sure that all the request were sent
@@ -101,14 +101,14 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER 
extends MiniRaftCluster>
         final SimpleMessage message = new SimpleMessage("err");
         testFailureCase("send should fail", () -> c2.send(message),
             ResourceUnavailableException.class);
-        testFailureCase("sendAsync should fail", () -> 
c2.sendAsync(message).get(),
+        testFailureCase("sendAsync should fail", () -> 
c2.async().send(message).get(),
             ExecutionException.class, ResourceUnavailableException.class);
 
         // more watch requests should get ResourceUnavailableException
         final long watchIndex = watchBase + watchElementLimit;
         testFailureCase("sendWatch should fail", () -> 
c2.sendWatch(watchIndex, ReplicationLevel.ALL),
             ResourceUnavailableException.class);
-        testFailureCase("sendWatchAsync should fail", () -> 
c2.sendWatchAsync(watchIndex, ReplicationLevel.ALL).get(),
+        testFailureCase("sendWatchAsync should fail", () -> 
c2.async().watch(watchIndex, ReplicationLevel.ALL).get(),
             ExecutionException.class, ResourceUnavailableException.class);
       }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 368b871..47f11a8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -98,7 +98,8 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       for(int i = 0; i < numMessages; i++) {
         final String message = "m" + i;
         log.info("SEND_REQUEST {}: message={}", i, message);
-        final CompletableFuture<RaftClientReply> replyFuture = 
writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message));
+        final CompletableFuture<RaftClientReply> replyFuture =
+            writeClient.async().send(new RaftTestUtil.SimpleMessage(message));
         replies.add(replyFuture);
         final CompletableFuture<WatchReplies> watchFuture = new 
CompletableFuture<>();
         watches.add(watchFuture);
@@ -106,10 +107,10 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
           final long logIndex = reply.getLogIndex();
           log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex);
           watchFuture.complete(new WatchReplies(logIndex,
-              writeClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
-              writeClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
-              writeClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY_COMMITTED),
-              writeClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED),
+              writeClient.async().watch(logIndex, ReplicationLevel.MAJORITY),
+              writeClient.async().watch(logIndex, ReplicationLevel.ALL),
+              writeClient.async().watch(logIndex, 
ReplicationLevel.MAJORITY_COMMITTED),
+              writeClient.async().watch(logIndex, 
ReplicationLevel.ALL_COMMITTED),
               log));
         });
       }
@@ -122,11 +123,11 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
           cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId(), 
policy)) {
 
         CompletableFuture<RaftClientReply> reply =
-            watchClient.sendAsync(new RaftTestUtil.SimpleMessage("message"));
+            watchClient.async().send(new 
RaftTestUtil.SimpleMessage("message"));
         long writeIndex = reply.get().getLogIndex();
         Assert.assertTrue(writeIndex > 0);
-        watchClient.sendWatchAsync(writeIndex, 
ReplicationLevel.MAJORITY_COMMITTED);
-        return watchClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY);
+        watchClient.async().watch(writeIndex, 
ReplicationLevel.MAJORITY_COMMITTED);
+        return watchClient.async().watch(logIndex, ReplicationLevel.MAJORITY);
       }
 
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
index 1757874..be1e3ac 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
@@ -55,7 +55,8 @@ public class TestGrpcMessageMetrics extends BaseTest
   static void sendMessages(MiniRaftCluster cluster) throws Exception {
     waitForLeader(cluster);
     try (final RaftClient client = cluster.createClient()) {
-      CompletableFuture<RaftClientReply> replyFuture =  client.sendAsync(new 
RaftTestUtil.SimpleMessage("abc"));
+      CompletableFuture<RaftClientReply> replyFuture =
+          client.async().send(new RaftTestUtil.SimpleMessage("abc"));
     }
     // Wait for commits to happen on leader
     JavaUtils.attempt(() -> assertMessageCount(cluster.getLeader()), 100, 
HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertMessageCount", null);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 18dab32..6c37589 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -73,7 +73,7 @@ public class TestLogAppenderWithGrpc
       }
       Collection<CompletableFuture<RaftClientReply>> futures = new 
ArrayList<>(maxAppends * 2);
       for (int i = 0; i < maxAppends * 2; i++) {
-        futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage("m")));
+        futures.add(client.async().send(new RaftTestUtil.SimpleMessage("m")));
       }
 
       FIVE_SECONDS.sleep();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 5d71f61..fda250c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -149,7 +149,7 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
 
     try (final RaftClient client = cluster.createClient()) {
       // send a request to make sure leader is ready
-      final CompletableFuture<RaftClientReply> f = client.sendAsync(new 
SimpleMessage("testing"));
+      final CompletableFuture<RaftClientReply> f = client.async().send(new 
SimpleMessage("testing"));
       Assert.assertTrue(f.get().isSuccess());
     }
 
@@ -211,7 +211,7 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
 
     try (RaftClient client = cluster.createClient()) {
       // send a request to make sure leader is ready
-      final CompletableFuture< RaftClientReply > f = client.sendAsync(new 
SimpleMessage("testing"));
+      final CompletableFuture< RaftClientReply > f = client.async().send(new 
SimpleMessage("testing"));
       Assert.assertTrue(f.get().isSuccess());
     }
 
@@ -226,7 +226,7 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
     try {
       RaftClient client = cluster.createClient(cluster.getLeader().getId(), 
RetryPolicies.noRetry());
       clients.add(client);
-      client.sendAsync(new SimpleMessage(message));
+      client.async().send(new SimpleMessage(message));
 
 
       final SortedMap<String, Gauge> gaugeMap =
@@ -239,7 +239,7 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
       for (int i = 0; i < 10; i++) {
         client = cluster.createClient(cluster.getLeader().getId(), 
RetryPolicies.noRetry());
         clients.add(client);
-        client.sendAsync(new SimpleMessage(message));
+        client.async().send(new SimpleMessage(message));
       }
 
       // Because we have passed 11 requests, and the element queue size is 10.
@@ -252,7 +252,7 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
       // and byte size counter limit will be hit.
 
       client = cluster.createClient(cluster.getLeader().getId(), 
RetryPolicies.noRetry());
-      client.sendAsync(new SimpleMessage(RandomStringUtils.random(120, true, 
false)));
+      client.async().send(new SimpleMessage(RandomStringUtils.random(120, 
true, false)));
       clients.add(client);
 
       RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
@@ -274,24 +274,24 @@ public class TestRaftServerWithGrpc extends BaseTest 
implements MiniRaftClusterW
     RaftServerMetrics raftServerMetrics = leader.getRaftServerMetrics();
 
     try (final RaftClient client = cluster.createClient()) {
-      final CompletableFuture<RaftClientReply> f1 = client.sendAsync(new 
SimpleMessage("testing"));
+      final CompletableFuture<RaftClientReply> f1 = client.async().send(new 
SimpleMessage("testing"));
       Assert.assertTrue(f1.get().isSuccess());
       
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_WRITE_REQUEST).getCount()
 > 0);
 
-      final CompletableFuture<RaftClientReply> f2 = 
client.sendReadOnlyAsync(new SimpleMessage("testing"));
+      final CompletableFuture<RaftClientReply> f2 = 
client.async().sendReadOnly(new SimpleMessage("testing"));
       Assert.assertTrue(f2.get().isSuccess());
       
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_READ_REQUEST).getCount()
 > 0);
 
-      final CompletableFuture<RaftClientReply> f3 = 
client.sendStaleReadAsync(new SimpleMessage("testing"),
+      final CompletableFuture<RaftClientReply> f3 = 
client.async().sendStaleRead(new SimpleMessage("testing"),
           0, leader.getId());
       Assert.assertTrue(f3.get().isSuccess());
       
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_STALE_READ_REQUEST).getCount()
 > 0);
 
-      final CompletableFuture<RaftClientReply> f4 = client.sendWatchAsync(0, 
RaftProtos.ReplicationLevel.ALL);
+      final CompletableFuture<RaftClientReply> f4 = client.async().watch(0, 
RaftProtos.ReplicationLevel.ALL);
       Assert.assertTrue(f4.get().isSuccess());
       
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
 "-ALL")).getCount() > 0);
 
-      final CompletableFuture<RaftClientReply> f5 = client.sendWatchAsync(0, 
RaftProtos.ReplicationLevel.MAJORITY);
+      final CompletableFuture<RaftClientReply> f5 = client.async().watch(0, 
RaftProtos.ReplicationLevel.MAJORITY);
       Assert.assertTrue(f5.get().isSuccess());
       
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
 "")).getCount() > 0);
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 0f1d71d..915221b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -80,7 +80,7 @@ public class TestRaftWithGrpc
           .forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
 
       CompletableFuture<RaftClientReply>
-          replyFuture = client.sendAsync(new 
RaftTestUtil.SimpleMessage("abc"));
+          replyFuture = client.async().send(new 
RaftTestUtil.SimpleMessage("abc"));
       TimeDuration.valueOf(5 , TimeUnit.SECONDS).sleep();
       // replyFuture should not be completed until append request is unblocked.
       Assert.assertTrue(!replyFuture.isDone());
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
 
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 8fa02c9..66fbd71 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -185,12 +185,12 @@ public class TestExceptionDependentRetry implements 
MiniRaftClusterWithGrpc.Fact
 
       // create a client with the exception dependent policy
       try (final RaftClient client = cluster.createClient(builder.build())) {
-        client.sendAsync(new RaftTestUtil.SimpleMessage("1")).get();
+        client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
 
         leader = cluster.getLeader();
         ((SimpleStateMachine4Testing) 
leader.getStateMachine()).blockWriteStateMachineData();
 
-        client.sendAsync(new RaftTestUtil.SimpleMessage("2")).get();
+        client.async().send(new RaftTestUtil.SimpleMessage("2")).get();
       }
       Assert.fail("Test should have failed.");
     } catch (ExecutionException e) {

Reply via email to