This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f78403a  RATIS-1093. Move blocking methods from RaftClient to a new 
BlockingApi interface (#219).  Contributed by Rui Wang
f78403a is described below

commit f78403a08700db47af7204da2f9ce46c2febc0ff
Author: Rui Wang <[email protected]>
AuthorDate: Mon Oct 12 22:51:31 2020 -0700

    RATIS-1093. Move blocking methods from RaftClient to a new BlockingApi 
interface (#219).  Contributed by Rui Wang
---
 .../java/org/apache/ratis/client/RaftClient.java   | 24 ++-------
 .../org/apache/ratis/client/api/BlockingApi.java   | 48 ++++++++++++++++++
 .../org/apache/ratis/client/impl/BlockingImpl.java | 57 ++++++++++++++++++++++
 .../apache/ratis/client/impl/RaftClientImpl.java   | 29 +++--------
 .../ratis/examples/arithmetic/cli/Assign.java      |  2 +-
 .../apache/ratis/examples/arithmetic/cli/Get.java  |  2 +-
 .../examples/counter/client/CounterClient.java     |  4 +-
 .../ratis/examples/filestore/FileStoreClient.java  |  4 +-
 .../ratis/examples/arithmetic/TestArithmetic.java  |  2 +-
 .../apache/ratis/examples/counter/TestCounter.java | 12 ++---
 .../ratis/logservice/api/LogServiceClient.java     | 15 +++---
 .../ratis/logservice/impl/LogReaderImpl.java       |  8 +--
 .../ratis/logservice/impl/LogStreamImpl.java       | 10 ++--
 .../ratis/logservice/impl/LogWriterImpl.java       |  4 +-
 .../apache/ratis/logservice/server/LogServer.java  |  4 +-
 .../ratis/logservice/server/LogStateMachine.java   |  6 +--
 .../ratis/logservice/server/MetaStateMachine.java  |  6 +--
 .../ratis/InstallSnapshotNotificationTests.java    |  8 +--
 .../java/org/apache/ratis/LogAppenderTests.java    |  4 +-
 .../org/apache/ratis/MessageStreamApiTests.java    |  6 +--
 .../java/org/apache/ratis/MiniRaftCluster.java     |  2 +-
 .../org/apache/ratis/RaftAsyncExceptionTests.java  |  2 +-
 .../test/java/org/apache/ratis/RaftBasicTests.java |  8 +--
 .../org/apache/ratis/RaftExceptionBaseTest.java    | 10 ++--
 .../test/java/org/apache/ratis/RaftTestUtil.java   |  2 +-
 .../apache/ratis/RequestLimitAsyncBaseTest.java    |  4 +-
 .../ratis/server/impl/GroupInfoBaseTest.java       |  2 +-
 .../ratis/server/impl/GroupManagementBaseTest.java |  2 +-
 .../ratis/server/impl/LeaderElectionTests.java     |  4 +-
 .../server/impl/RaftReconfigurationBaseTest.java   |  6 +--
 .../impl/RaftStateMachineExceptionTests.java       |  2 +-
 .../server/impl/StateMachineShutdownTests.java     |  6 +--
 .../ratis/statemachine/RaftSnapshotBaseTest.java   |  6 +--
 .../apache/ratis/grpc/TestLogAppenderWithGrpc.java |  6 +--
 .../apache/ratis/server/ServerRestartTests.java    | 12 ++---
 .../ratis/server/raftlog/TestRaftLogMetrics.java   |  4 +-
 .../ratis/statemachine/TestStateMachine.java       |  2 +-
 37 files changed, 203 insertions(+), 132 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 7794817..c5decd6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client;
 
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.api.AsyncApi;
+import org.apache.ratis.client.api.BlockingApi;
 import org.apache.ratis.client.api.DataStreamApi;
 import org.apache.ratis.client.api.GroupManagementApi;
 import org.apache.ratis.client.api.MessageStreamApi;
@@ -29,7 +30,6 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +60,9 @@ public interface RaftClient extends Closeable {
   /** @return the {@link MessageStreamApi}. */
   MessageStreamApi getMessageStreamApi();
 
+  /** @return the {@link BlockingApi}. */
+  BlockingApi io();
+
   /** @return the {@link DataStreamApi}. */
   default DataStreamApi getDataStreamApi() {
     // TODO RATIS-1090: Implements this once the streaming feature has become 
usable.
@@ -67,25 +70,6 @@ public interface RaftClient extends Closeable {
         JavaUtils.getCurrentStackTraceElement().getMethodName() + " is not yet 
supported.");
   }
 
-  /**
-   * Send the given message to the raft service.
-   * The message may change the state of the service.
-   * For readonly messages, use {@link #sendReadOnly(Message)} instead.
-   *
-   * @param message The request message.
-   * @return the reply.
-   */
-  RaftClientReply send(Message message) throws IOException;
-
-  /** Send the given readonly message to the raft service. */
-  RaftClientReply sendReadOnly(Message message) throws IOException;
-
-  /** Send the given stale-read message to the given server (not the raft 
service). */
-  RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId 
server) throws IOException;
-
-  /** Watch the given index to satisfy the given replication level. */
-  RaftClientReply sendWatch(long index, ReplicationLevel replication) throws 
IOException;
-
   /** Send set configuration request to the raft service. */
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws 
IOException;
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
new file mode 100644
index 0000000..dc14cd5
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.api;
+
+import java.io.IOException;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/**
+ * APIs to support blocking operations such as send message, send (stale)read 
message and watch request.
+ */
+public interface BlockingApi {
+  /**
+   * Send the given message to the raft service.
+   * The message may change the state of the service.
+   * For readonly messages, use {@link #sendReadOnly(Message)} instead.
+   *
+   * @param message The request message.
+   * @return the reply.
+   */
+  RaftClientReply send(Message message) throws IOException;
+
+  /** Send the given readonly message to the raft service. */
+  RaftClientReply sendReadOnly(Message message) throws IOException;
+
+  /** Send the given stale-read message to the given server (not the raft 
service). */
+  RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId 
server) throws IOException;
+
+  /** Watch the given index to satisfy the given replication level. */
+  RaftClientReply watch(long index, ReplicationLevel replication) throws 
IOException;
+}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
new file mode 100644
index 0000000..d73c227
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import java.io.IOException;
+import java.util.Objects;
+import org.apache.ratis.client.api.BlockingApi;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/** Blocking api implementations. */
+class BlockingImpl implements BlockingApi {
+  private final RaftClientImpl client;
+
+  BlockingImpl(RaftClientImpl client) {
+    this.client = Objects.requireNonNull(client, "client == null");
+  }
+
+  @Override
+  public RaftClientReply send(Message message) throws IOException {
+    return client.send(RaftClientRequest.writeRequestType(), message, null);
+  }
+
+  @Override
+  public RaftClientReply sendReadOnly(Message message) throws IOException {
+    return client.send(RaftClientRequest.readRequestType(), message, null);
+  }
+
+  @Override
+  public RaftClientReply sendStaleRead(Message message, long minIndex, 
RaftPeerId server)
+      throws IOException {
+    return client.send(RaftClientRequest.staleReadRequestType(minIndex), 
message, server);
+  }
+
+  @Override
+  public RaftClientReply watch(long index, ReplicationLevel replication) 
throws IOException {
+    return client.send(RaftClientRequest.watchRequestType(index, replication), 
null, null);
+  }
+}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 25b4eab..9d096ce 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -25,7 +25,6 @@ import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.client.api.MessageStreamApi;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
@@ -207,28 +206,7 @@ public final class RaftClientImpl implements RaftClient {
         callId, message, type, slidingWindowEntry);
   }
 
-  @Override
-  public RaftClientReply send(Message message) throws IOException {
-    return send(RaftClientRequest.writeRequestType(), message, null);
-  }
-
-  @Override
-  public RaftClientReply sendReadOnly(Message message) throws IOException {
-    return send(RaftClientRequest.readRequestType(), message, null);
-  }
-
-  @Override
-  public RaftClientReply sendStaleRead(Message message, long minIndex, 
RaftPeerId server)
-      throws IOException {
-    return send(RaftClientRequest.staleReadRequestType(minIndex), message, 
server);
-  }
-
-  @Override
-  public RaftClientReply sendWatch(long index, ReplicationLevel replication) 
throws IOException {
-    return send(RaftClientRequest.watchRequestType(index, replication), null, 
null);
-  }
-
-  private RaftClientReply send(RaftClientRequest.Type type, Message message, 
RaftPeerId server)
+  RaftClientReply send(RaftClientRequest.Type type, Message message, 
RaftPeerId server)
       throws IOException {
     if (!type.is(TypeCase.WATCH)) {
       Objects.requireNonNull(message, "message == null");
@@ -261,6 +239,11 @@ public final class RaftClientImpl implements RaftClient {
     return new GroupManagementImpl(server, this);
   }
 
+  @Override
+  public BlockingImpl io() {
+    return new BlockingImpl(this);
+  }
+
   void addServers(Stream<RaftPeer> peersInNewConf) {
     clientRpc.addServers(
         peersInNewConf.filter(p -> !peers.contains(p))::iterator);
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
index 3ae8cd6..3c6d774 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
@@ -56,7 +56,7 @@ public class Assign extends Client {
 
   @Override
   protected void operation(RaftClient client) throws IOException {
-    RaftClientReply send = client.send(
+    RaftClientReply send = client.io().send(
         new AssignmentMessage(new Variable(name), createExpression(value)));
     System.out.println("Success: " + send.isSuccess());
     System.out.println("Response: " + send.getMessage().getClass());
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
index c274334..89050f1 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
@@ -40,7 +40,7 @@ public class Get extends Client {
   @Override
   protected void operation(RaftClient client) throws IOException {
     RaftClientReply getValue =
-        client.sendReadOnly(Expression.Utils.toMessage(new Variable(name)));
+        client.io().sendReadOnly(Expression.Utils.toMessage(new 
Variable(name)));
     Expression response =
         
Expression.Utils.bytes2Expression(getValue.getMessage().getContent().toByteArray(),
 0);
     System.out.println(String.format("%s=%s", name, (DoubleValue) 
response).toString());
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index faa0cd8..eafdb03 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -62,7 +62,7 @@ public final class CounterClient {
     System.out.printf("Sending %d increment command...\n", increment);
     for (int i = 0; i < increment; i++) {
       executorService.submit(() ->
-          raftClient.send(Message.valueOf("INCREMENT")));
+          raftClient.io().send(Message.valueOf("INCREMENT")));
     }
 
     //shutdown the executor service and wait until they finish their work
@@ -70,7 +70,7 @@ public final class CounterClient {
     executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);
 
     //send GET command and print the response
-    RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET"));
+    RaftClientReply count = 
raftClient.io().sendReadOnly(Message.valueOf("GET"));
     String response = 
count.getMessage().getContent().toString(Charset.defaultCharset());
     System.out.println(response);
   }
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index f6269bb..8ce4487 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -95,11 +95,11 @@ public class FileStoreClient implements Closeable {
   }
 
   private ByteString send(ByteString request) throws IOException {
-    return send(request, client::send);
+    return send(request, client.io()::send);
   }
 
   private ByteString sendReadOnly(ByteString request) throws IOException {
-    return send(request, client::sendReadOnly);
+    return send(request, client.io()::sendReadOnly);
   }
 
   private CompletableFuture<ByteString> sendAsync(ByteString request) {
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
index f2cedce..da990fa 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
@@ -153,7 +153,7 @@ public class TestArithmetic extends ParameterizedBaseTest {
   }
 
   static Expression assign(RaftClient client, Variable x, Expression e, Double 
expected) throws IOException {
-    final RaftClientReply r = client.send(x.assign(e));
+    final RaftClientReply r = client.io().send(x.assign(e));
     return assertRaftClientReply(r, expected);
   }
 
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
index beb71be..d0aa86e 100644
--- 
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -47,21 +47,21 @@ public class TestCounter extends ParameterizedBaseTest {
     setAndStart(cluster);
     try (final RaftClient client = cluster.createClient()) {
       for (int i = 0; i < 10; i++) {
-        client.send(Message.valueOf("INCREMENT"));
+        client.io().send(Message.valueOf("INCREMENT"));
       }
-      RaftClientReply reply1 = client.sendReadOnly(Message.valueOf("GET"));
+      RaftClientReply reply1 = 
client.io().sendReadOnly(Message.valueOf("GET"));
       Assert.assertEquals("10",
           reply1.getMessage().getContent().toString(Charset.defaultCharset()));
       for (int i = 0; i < 10; i++) {
-        client.send(Message.valueOf("INCREMENT"));
+        client.io().send(Message.valueOf("INCREMENT"));
       }
-      RaftClientReply reply2 = client.sendReadOnly(Message.valueOf("GET"));
+      RaftClientReply reply2 = 
client.io().sendReadOnly(Message.valueOf("GET"));
       Assert.assertEquals("20",
           reply2.getMessage().getContent().toString(Charset.defaultCharset()));
       for (int i = 0; i < 10; i++) {
-        client.send(Message.valueOf("INCREMENT"));
+        client.io().send(Message.valueOf("INCREMENT"));
       }
-      RaftClientReply reply3 = client.sendReadOnly(Message.valueOf("GET"));
+      RaftClientReply reply3 = 
client.io().sendReadOnly(Message.valueOf("GET"));
       Assert.assertEquals("30",
           reply3.getMessage().getContent().toString(Charset.defaultCharset()));
     }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
index dd25967..559c263 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
@@ -93,7 +93,7 @@ public class LogServiceClient implements AutoCloseable {
      * @throws IOException
      */
     public LogStream createLog(LogName logName) throws IOException {
-        RaftClientReply reply = client.sendReadOnly(
+        RaftClientReply reply = client.io().sendReadOnly(
             () -> 
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
         CreateLogReplyProto message =
             CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
@@ -138,7 +138,7 @@ public class LogServiceClient implements AutoCloseable {
 
     public List<ArchivalInfo> getExportStatus(LogName logName) throws 
IOException {
         try (RaftClient client = getRaftClient(getLogInfo(logName))) {
-            RaftClientReply exportInfoReply = client.sendReadOnly(
+            RaftClientReply exportInfoReply = client.io().sendReadOnly(
                 () -> 
LogServiceProtoUtil.toExportInfoRequestProto(logName).toByteString());
             LogServiceProtos.GetExportInfoReplyProto message =
                 LogServiceProtos.GetExportInfoReplyProto
@@ -153,7 +153,7 @@ public class LogServiceClient implements AutoCloseable {
     }
 
     public void deleteLog(LogName logName) throws IOException {
-        RaftClientReply reply = client.sendReadOnly
+        RaftClientReply reply = client.io().sendReadOnly
                 (() -> 
MetaServiceProtoUtil.toDeleteLogRequestProto(logName).toByteString());
         DeleteLogReplyProto message = 
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
         if(message.hasException()) {
@@ -167,7 +167,7 @@ public class LogServiceClient implements AutoCloseable {
      * @throws IOException
      */
     public List<LogInfo> listLogs() throws IOException {
-        RaftClientReply reply = client.sendReadOnly
+        RaftClientReply reply = client.io().sendReadOnly
                 (() -> 
MetaServiceProtoUtil.toListLogRequestProto().toByteString());
         ListLogsReplyProto message = 
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
         List<LogInfoProto> infoProtos = message.getLogsList();
@@ -197,7 +197,7 @@ public class LogServiceClient implements AutoCloseable {
     }
 
     private LogInfo getLogInfo(LogName logName) throws IOException {
-        RaftClientReply reply = client.sendReadOnly(
+        RaftClientReply reply = client.io().sendReadOnly(
             () -> 
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
         GetLogReplyProto message = 
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
         if (message.hasException()) {
@@ -234,7 +234,8 @@ public class LogServiceClient implements AutoCloseable {
      */
     public void exportLog(LogName logName, String location, long recordId) 
throws IOException {
         try (RaftClient client = getRaftClient(getLogInfo(logName))) {
-            RaftClientReply archiveLogReply = client.sendReadOnly(() -> 
LogServiceProtoUtil
+            RaftClientReply archiveLogReply =
+                client.io().sendReadOnly(() -> LogServiceProtoUtil
                 .toArchiveLogRequestProto(logName, location, recordId,
                     location == null ? true : false, 
ArchivalInfo.ArchivalStatus.SUBMITTED)
                 .toByteString());
@@ -256,7 +257,7 @@ public class LogServiceClient implements AutoCloseable {
     // TODO this name sucks, confusion WRT the Java Closeable interface.
     public void closeLog(LogName name) throws IOException {
         try (RaftClient client = getRaftClient(getLogInfo(name))) {
-            RaftClientReply reply = client.send(
+            RaftClientReply reply = client.io().send(
                 () -> LogServiceProtoUtil.toChangeStateRequestProto(name, 
State.CLOSED)
                     .toByteString());
             LogServiceProtos.ChangeStateReplyProto message =
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
index c43e916..706f65e 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
@@ -78,7 +78,7 @@ public class LogReaderImpl implements LogReader {
     try {
       RaftClientReply reply =
           raftClient
-              .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+              .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
                   .toReadLogRequestProto(parent.getName(), currentRecordId, 
1).toByteString()));
       if (reply.getException() != null) {
         throw new IOException(reply.getException());
@@ -108,7 +108,7 @@ public class LogReaderImpl implements LogReader {
 
     Preconditions.checkNotNull(buffer, "buffer is NULL" );
     try {
-      RaftClientReply reply = 
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+      RaftClientReply reply = 
raftClient.io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
           .toReadLogRequestProto(parent.getName(), currentRecordId, 
1).toByteString()));
       if (reply.getException() != null) {
         throw new IOException(reply.getException());
@@ -135,7 +135,7 @@ public class LogReaderImpl implements LogReader {
 
     try {
       RaftClientReply reply = raftClient
-          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+          .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
               .toReadLogRequestProto(parent.getName(), currentRecordId, 
numRecords).toByteString()));
       if (reply.getException() != null) {
         throw new IOException(reply.getException());
@@ -166,7 +166,7 @@ public class LogReaderImpl implements LogReader {
     Preconditions.checkArgument(buffers.length > 0, "list of buffers is 
empty");
 
     try {
-      RaftClientReply reply = 
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+      RaftClientReply reply = 
raftClient.io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
           .toReadLogRequestProto(parent.getName(), currentRecordId, 
buffers.length).toByteString()));
       if (reply.getException() != null) {
         throw new IOException(reply.getException());
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index f029fb3..8eff2e1 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -98,7 +98,7 @@ public class LogStreamImpl implements LogStream {
   }
 
   @Override public State getState() throws IOException {
-    RaftClientReply reply = raftClient.sendReadOnly(
+    RaftClientReply reply = raftClient.io().sendReadOnly(
         
Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name).toByteString()));
     LogServiceProtos.GetStateReplyProto proto =
         
LogServiceProtos.GetStateReplyProto.parseFrom(reply.getMessage().getContent());
@@ -108,7 +108,7 @@ public class LogStreamImpl implements LogStream {
   @Override
   public long getSize() throws IOException{
     RaftClientReply reply = raftClient
-        .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+        .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
             .toGetSizeRequestProto(name).toByteString()));
     if (reply.getException() != null) {
       throw new IOException(reply.getException());
@@ -126,7 +126,7 @@ public class LogStreamImpl implements LogStream {
   @Override
   public long getLength() throws IOException {
     RaftClientReply reply = raftClient
-        .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+        .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
             .toGetLengthRequestProto(name).toByteString()));
     if (reply.getException() != null) {
       throw new IOException(reply.getException());
@@ -155,7 +155,7 @@ public class LogStreamImpl implements LogStream {
   public long getLastRecordId() throws IOException {
     try {
       RaftClientReply reply = raftClient
-          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+          .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
               .toGetLastCommittedIndexRequestProto(name).toByteString()));
       if (reply.getException() != null) {
         throw new IOException(reply.getException());
@@ -177,7 +177,7 @@ public class LogStreamImpl implements LogStream {
   public long getStartRecordId() throws IOException {
     try {
       RaftClientReply reply = raftClient
-          .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+          .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
               .toGetStartIndexProto(name).toByteString()));
       if (reply.getException() != null) {
         throw new IOException(reply.getException());
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
index e292739..39f1a43 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
@@ -67,7 +67,7 @@ public class LogWriterImpl implements LogWriter {
   @Override
   public List<Long> write(List<ByteBuffer> list) throws IOException {
     try {
-      RaftClientReply reply = raftClient.send(
+      RaftClientReply reply = raftClient.io().send(
           
Message.valueOf(LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(),
 list).toByteString()));
       if (reply.getException() != null) {
         throw new IOException(reply.getException());
@@ -89,7 +89,7 @@ public class LogWriterImpl implements LogWriter {
  @Override
  public long sync() throws IOException {
      try {
-       RaftClientReply reply = raftClient.send(Message
+       RaftClientReply reply = raftClient.io().send(Message
            
.valueOf(LogServiceProtoUtil.toSyncLogRequestProto(parent.getName()).toByteString()));
        if (reply.getException() != null) {
          throw new IOException(reply.getException());
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index b68a763..f1d9cf9 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -145,7 +145,7 @@ public class LogServer extends BaseServer {
                 .setClientId(ClientId.randomId())
                 .setProperties(properties)
                 .build();
-        metaClient.send(() -> 
MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
+        metaClient.io().send(() -> 
MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
         daemon = new Daemon(new HeartbeatSender(new 
RaftPeer(raftServer.getId())),
                 "heartbeat-Sender"+raftServer.getId());
         daemon.start();
@@ -200,7 +200,7 @@ public class LogServer extends BaseServer {
 
             while (true) {
                 try {
-                    metaClient.send(() -> MetaServiceProtoUtil.
+                    metaClient.io().send(() -> MetaServiceProtoUtil.
                             toHeartbeatRequestProto(peer).toByteString());
                     Thread.sleep(heartbeatInterval);
                 } catch (InterruptedException e) {
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index f49f9dd..45ac3d0 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -722,7 +722,7 @@ public class LogStateMachine extends BaseStateMachine {
 
   private void sendArchiveLogrequestToNewLeader(long recordId, LogName 
logName, String location)
       throws IOException {
-    getClient().sendReadOnly(() -> LogServiceProtoUtil
+    getClient().io().sendReadOnly(() -> LogServiceProtoUtil
         .toArchiveLogRequestProto(logName, location, recordId, 
isArchivalRequest,
             ArchivalStatus.INTERRUPTED).toByteString());
   }
@@ -744,7 +744,7 @@ public class LogStateMachine extends BaseStateMachine {
   private void updateArchivingInfo(long recordId, LogName logName, String 
location,
       boolean isArchival, ArchivalStatus status)
       throws IOException {
-    RaftClientReply archiveLogReply = getClient().send(() -> 
LogServiceProtoUtil
+    RaftClientReply archiveLogReply = getClient().io().send(() -> 
LogServiceProtoUtil
         .toArchiveLogRequestProto(logName, location, recordId, isArchival, 
status).toByteString());
     LogServiceProtos.ArchiveLogReplyProto 
message=LogServiceProtos.ArchiveLogReplyProto
         .parseFrom(archiveLogReply.getMessage().getContent());
@@ -754,7 +754,7 @@ public class LogStateMachine extends BaseStateMachine {
   }
 
   private void sendChangeStateRequest(State st, boolean force) throws 
IOException {
-      getClient().send(
+      getClient().io().send(
           () -> 
LogServiceProtoUtil.toChangeStateRequestProto(LogName.of("Dummy"), st, force)
               .toByteString());
   }
diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 2982595..6988d1a 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -270,7 +270,7 @@ public class MetaStateMachine extends BaseStateMachine {
             });
             try (RaftClient client = 
RaftClient.newBuilder().setRaftGroup(currentGroup)
                 
.setClientId(ClientId.randomId()).setProperties(properties).build()){
-                client.send(() -> 
MetaServiceProtos.MetaSMRequestProto.newBuilder()
+                client.io().send(() -> 
MetaServiceProtos.MetaSMRequestProto.newBuilder()
                         .setUnregisterRequest(
                                 
LogServiceUnregisterLogRequestProto.newBuilder()
                                         
.setLogname(LogServiceProtoUtil.toLogNameProto(logName)))
@@ -352,7 +352,7 @@ public class MetaStateMachine extends BaseStateMachine {
                 }
                 try (RaftClient client = 
RaftClient.newBuilder().setRaftGroup(currentGroup)
                     
.setClientId(ClientId.randomId()).setProperties(properties).build()){
-                    client.send(() -> 
MetaServiceProtos.MetaSMRequestProto.newBuilder()
+                    client.io().send(() -> 
MetaServiceProtos.MetaSMRequestProto.newBuilder()
                             
.setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
                                     
.setLogname(LogServiceProtoUtil.toLogNameProto(name))
                                     .setRaftGroup(MetaServiceProtoUtil
@@ -451,7 +451,7 @@ public class MetaStateMachine extends BaseStateMachine {
                                         LOG.warn(String.format("Peer %s in the 
group %s went down." +
                                                         " Hence closing the 
log %s serve by the group.",
                                                 raftPeer.toString(), 
group.toString(), logName.toString()));
-                                        RaftClientReply reply = client.send(
+                                        RaftClientReply reply = 
client.io().send(
                                                 () -> LogServiceProtoUtil.
                                                         
toChangeStateRequestProto(logName, LogStream.State.CLOSED, true)
                                                         .toByteString());
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 9cf4652..c41a284 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -123,7 +123,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       try(final RaftClient client = cluster.createClient(leaderId)) {
         for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
           RaftClientReply
-              reply = client.send(new RaftTestUtil.SimpleMessage("m" + i));
+              reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + 
i));
           Assert.assertTrue(reply.isSuccess());
         }
       }
@@ -159,7 +159,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
 
       // generate some more traffic
       try(final RaftClient client = 
cluster.createClient(cluster.getLeader().getId())) {
-        Assert.assertTrue(client.send(new RaftTestUtil.SimpleMessage("m" + 
i)).isSuccess());
+        Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" 
+ i)).isSuccess());
       }
 
       final SnapshotInfo leaderSnapshotInfo = 
cluster.getLeader().getStateMachine().getLatestSnapshot();
@@ -203,7 +203,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
 
     try (final RaftClient client = cluster.createClient(leaderId)) {
       for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
-        final RaftClientReply reply = client.send(new 
RaftTestUtil.SimpleMessage("m" + i));
+        final RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("m" + i));
         Assert.assertTrue(reply.isSuccess());
       }
     }
@@ -225,7 +225,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
 
     // generate some more traffic
     try (final RaftClient client = cluster.createClient(leader.getId())) {
-      Assert.assertTrue(client.send(new RaftTestUtil.SimpleMessage("m" + 
i)).isSuccess());
+      Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" + 
i)).isSuccess());
     }
 
     FIVE_SECONDS.sleep();
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java 
b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 2f87cd8..8a40009 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -101,7 +101,7 @@ public abstract class LogAppenderTests<CLUSTER extends 
MiniRaftCluster>
       try {
         latch.await();
         for (SimpleMessage msg : messages) {
-          client.send(msg);
+          client.io().send(msg);
         }
         client.close();
         succeed.set(true);
@@ -134,7 +134,7 @@ public abstract class LogAppenderTests<CLUSTER extends 
MiniRaftCluster>
     // Write 10 messages to leader.
     try(RaftClient client = cluster.createClient(leaderServer.getId())) {
       for (int i = 1; i <= 10; i++) {
-        client.send(new RaftTestUtil.SimpleMessage("Msg to make leader ready " 
+  i));
+        client.io().send(new RaftTestUtil.SimpleMessage("Msg to make leader 
ready " +  i));
       }
     } catch (IOException e) {
       throw e;
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java 
b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
index 5da5c91..f49564c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
@@ -73,11 +73,11 @@ public abstract class MessageStreamApiTests<CLUSTER extends 
MiniRaftCluster> ext
     final String k = key.toString();
     try(RaftClient client = cluster.createClient()) {
       final String k1 = k.substring(0, endOfRequest);
-      final RaftClientReply r1= client.sendReadOnly(new SimpleMessage(k1));
+      final RaftClientReply r1= client.io().sendReadOnly(new 
SimpleMessage(k1));
       Assert.assertTrue(r1.isSuccess());
 
       final String k2 = k.substring(endOfRequest);
-      final RaftClientReply r2 = client.sendReadOnly(new SimpleMessage(k2));
+      final RaftClientReply r2 = client.io().sendReadOnly(new 
SimpleMessage(k2));
       Assert.assertTrue(r2.isSuccess());
     }
   }
@@ -119,7 +119,7 @@ public abstract class MessageStreamApiTests<CLUSTER extends 
MiniRaftCluster> ext
 
     // check if all the parts are streamed as a single message.
     try(RaftClient client = cluster.createClient()) {
-      final RaftClientReply reply = client.sendReadOnly(new 
SimpleMessage(bytes.toString(StandardCharsets.UTF_8)));
+      final RaftClientReply reply = client.io().sendReadOnly(new 
SimpleMessage(bytes.toString(StandardCharsets.UTF_8)));
       Assert.assertTrue(reply.isSuccess());
     }
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 2b7a19f..fb6050a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -492,7 +492,7 @@ public abstract class MiniRaftCluster implements Closeable {
   public RaftServerImpl getLeaderAndSendFirstMessage(boolean ignoreException) 
throws IOException {
     final RaftServerImpl leader = getLeader();
     try(RaftClient client = createClient(leader.getId())) {
-      client.send(new RaftTestUtil.SimpleMessage("first msg to make leader 
ready"));
+      client.io().send(new RaftTestUtil.SimpleMessage("first msg to make 
leader ready"));
     } catch (IOException e) {
       if (!ignoreException) {
         throw e;
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index 23b63fe..2a47164 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -99,7 +99,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER extends 
MiniRaftCluster>
   private void runTestTimeoutException(CLUSTER cluster) throws Exception {
     // send a message to make sure the cluster is working
     try(RaftClient client = cluster.createClient()) {
-      final RaftClientReply reply = client.send(new SimpleMessage("m0"));
+      final RaftClientReply reply = client.io().send(new SimpleMessage("m0"));
       Assert.assertTrue(reply.isSuccess());
 
       RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(), ONE_SECOND);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 7e1e47e..48d310b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -147,7 +147,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
             }
           });
         } else {
-          final RaftClientReply reply = client.send(message);
+          final RaftClientReply reply = client.io().send(message);
           Assert.assertTrue(reply.isSuccess());
         }
       }
@@ -288,7 +288,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
         for (int i = 0; i < messages.length; i++) {
           if (!useAsync) {
             final RaftClientReply reply =
-                client.send(messages[step.getAndIncrement()]);
+                client.io().send(messages[step.getAndIncrement()]);
             Assert.assertTrue(reply.isSuccess());
           } else {
             final CompletableFuture<RaftClientReply> replyFuture =
@@ -436,7 +436,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
         CompletableFuture<RaftClientReply> replyFuture = 
client.async().send(new SimpleMessage("abc"));
         replyFuture.get();
       } else {
-        client.send(new SimpleMessage("abc"));
+        client.io().send(new SimpleMessage("abc"));
       }
       // Eventually the request would be accepted by the server
       // when the retry cache entry is invalidated.
@@ -467,7 +467,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
         CompletableFuture<RaftClientReply> replyFuture = 
client.async().send(new SimpleMessage("abc"));
         replyFuture.get();
       } else {
-        client.send(new SimpleMessage("abc"));
+        client.io().send(new SimpleMessage("abc"));
       }
 
       long appliedIndexAfter = (Long) appliedIndexGauge.getValue();
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 8b6fd2d..bebf940 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -91,7 +91,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends 
MiniRaftCluster>
   }
 
   static void sendMessage(String message, RaftClient client) throws 
IOException {
-    final RaftClientReply reply = client.send(new SimpleMessage(message));
+    final RaftClientReply reply = client.io().send(new SimpleMessage(message));
     Assert.assertTrue(reply.isSuccess());
   }
 
@@ -149,11 +149,11 @@ public abstract class RaftExceptionBaseTest<CLUSTER 
extends MiniRaftCluster>
     // Create client using another group
     try(RaftClient client = cluster.createClient(anotherGroup)) {
       testFailureCase("send(..) with client group being different from the 
server group",
-          () -> client.send(Message.EMPTY),
+          () -> client.io().send(Message.EMPTY),
           GroupMismatchException.class);
 
       testFailureCase("sendReadOnly(..) with client group being different from 
the server group",
-          () -> client.sendReadOnly(Message.EMPTY),
+          () -> client.io().sendReadOnly(Message.EMPTY),
           GroupMismatchException.class);
 
       testFailureCase("setConfiguration(..) with client group being different 
from the server group",
@@ -177,7 +177,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends 
MiniRaftCluster>
     try (RaftClient client = cluster.createClient()) {
       final RaftPeerId follower = 
cluster.getFollowers().iterator().next().getId();
       testFailureCase("sendStaleRead(..) with a large commit index",
-          () -> client.sendStaleRead(Message.EMPTY, 1_000_000_000L, follower),
+          () -> client.io().sendStaleRead(Message.EMPTY, 1_000_000_000L, 
follower),
           StateMachineException.class, StaleReadException.class);
     }
   }
@@ -194,7 +194,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends 
MiniRaftCluster>
     SimpleMessage msg = new SimpleMessage(new String(bytes));
     try (RaftClient client = cluster.createClient(leaderId)) {
       testFailureCase("testLogAppenderBufferCapacity",
-          () -> client.send(msg),
+          () -> client.io().send(msg),
           StateMachineException.class, RaftLogIOException.class);
     }
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index f743bc9..05435ca 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -456,7 +456,7 @@ public interface RaftTestUtil {
     Thread t = new Thread(() -> {
       try (final RaftClient client = cluster.createClient(leaderId)) {
         for (SimpleMessage mssg: messages) {
-          client.send(mssg);
+          client.io().send(mssg);
         }
       } catch (Exception e) {
         e.printStackTrace();
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
index df7ba15..a1084f8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
@@ -99,14 +99,14 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER 
extends MiniRaftCluster>
       try(RaftClient c2 = cluster.createClient(leader.getId(), 
RetryPolicies.noRetry())) {
         // more write requests should get ResourceUnavailableException
         final SimpleMessage message = new SimpleMessage("err");
-        testFailureCase("send should fail", () -> c2.send(message),
+        testFailureCase("send should fail", () -> c2.io().send(message),
             ResourceUnavailableException.class);
         testFailureCase("sendAsync should fail", () -> 
c2.async().send(message).get(),
             ExecutionException.class, ResourceUnavailableException.class);
 
         // more watch requests should get ResourceUnavailableException
         final long watchIndex = watchBase + watchElementLimit;
-        testFailureCase("sendWatch should fail", () -> 
c2.sendWatch(watchIndex, ReplicationLevel.ALL),
+        testFailureCase("sendWatch should fail", () -> 
c2.io().watch(watchIndex, ReplicationLevel.ALL),
             ResourceUnavailableException.class);
         testFailureCase("sendWatchAsync should fail", () -> 
c2.async().watch(watchIndex, ReplicationLevel.ALL).get(),
             ExecutionException.class, ResourceUnavailableException.class);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
index bde28a7..0e563eb 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
@@ -121,7 +121,7 @@ public abstract class GroupInfoBaseTest<CLUSTER extends 
MiniRaftCluster>
     RaftClientReply reply = null;
     try(final RaftClient client = cluster.createClient()) {
       for(int i = 0; i < n; i++) {
-        reply = client.send(Message.valueOf("m" + i));
+        reply = client.io().send(Message.valueOf("m" + i));
       }
     }
     return reply;
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index f8ebf4f..3c2de57 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -143,7 +143,7 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
     // when suggested leader rejoin cluster, it will catch up log first.
     try (final RaftClient client = cluster.createClient(newGroup)) {
       for (int i = 0; i < 10; i ++) {
-        RaftClientReply reply = client.send(new RaftTestUtil.SimpleMessage("m" 
+ i));
+        RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("m" + i));
         Assert.assertTrue(reply.isSuccess());
       }
     }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 1ccbc17..844d9cc 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -159,10 +159,10 @@ public abstract class LeaderElectionTests<CLUSTER extends 
MiniRaftCluster>
 
       final RaftServerImpl leader = waitForLeader(cluster);
       try (RaftClient client = cluster.createClient(leader.getId())) {
-        client.send(new RaftTestUtil.SimpleMessage("message"));
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
         Thread.sleep(1000);
         isolate(cluster, leader.getId());
-        RaftClientReply reply = client.send(new 
RaftTestUtil.SimpleMessage("message"));
+        RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("message"));
         Assert.assertNotEquals(reply.getReplierId(), 
leader.getId().toString());
         Assert.assertTrue(reply.isSuccess());
       } finally {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index ae38622..b54fbdc 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -196,7 +196,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
 
         // submit some msgs before reconf
         for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
-          RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+          RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
           Assert.assertTrue(reply.isSuccess());
         }
 
@@ -326,7 +326,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
 
         // submit some msgs before reconf
         for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
-          RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+          RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
           Assert.assertTrue(reply.isSuccess());
         }
 
@@ -465,7 +465,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
   void runTestNoChangeRequest(CLUSTER cluster) throws Exception {
     final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
     try(final RaftClient client = cluster.createClient(leader.getId())) {
-      client.send(new SimpleMessage("m"));
+      client.io().send(new SimpleMessage("m"));
 
       final RaftLog leaderLog = leader.getState().getLog();
       final long committedIndex = leaderLog.getLastCommittedIndex();
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index e6e5c2b..7406c44 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -87,7 +87,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER 
extends MiniRaftClu
     RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
 
     try(final RaftClient client = cluster.createClient(leaderId)) {
-      client.send(new RaftTestUtil.SimpleMessage("m"));
+      client.io().send(new RaftTestUtil.SimpleMessage("m"));
       fail("Exception expected");
     } catch (StateMachineException e) {
       e.printStackTrace();
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 112f118..a6ffb30 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -90,13 +90,13 @@ public abstract class StateMachineShutdownTests<CLUSTER 
extends MiniRaftCluster>
     cluster.getLeaderAndSendFirstMessage(true);
 
     try (final RaftClient client = cluster.createClient(leaderId)) {
-      client.send(new RaftTestUtil.SimpleMessage("message"));
-      RaftClientReply reply = client.send(
+      client.io().send(new RaftTestUtil.SimpleMessage("message"));
+      RaftClientReply reply = client.io().send(
               new RaftTestUtil.SimpleMessage("message2"));
 
       long logIndex = reply.getLogIndex();
       //Confirm that followers have committed
-      RaftClientReply watchReply = client.sendWatch(
+      RaftClientReply watchReply = client.io().watch(
               logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
       watchReply.getCommitInfos().forEach(
               val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index e9b90a2..1945e91 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -145,7 +145,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest 
{
     int i = 0;
     try(final RaftClient client = cluster.createClient(leaderId)) {
       for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
-        RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+        RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
         Assert.assertTrue(reply.isSuccess());
       }
     }
@@ -192,7 +192,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest 
{
 
       try(final RaftClient client = cluster.createClient(leaderId)) {
         for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
-          RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+          RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
           Assert.assertTrue(reply.isSuccess());
         }
       }
@@ -227,7 +227,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest 
{
 
       // generate some more traffic
       try(final RaftClient client = 
cluster.createClient(cluster.getLeader().getId())) {
-        Assert.assertTrue(client.send(new SimpleMessage("m" + i)).isSuccess());
+        Assert.assertTrue(client.io().send(new SimpleMessage("m" + 
i)).isSuccess());
       }
 
       // add two more peers
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 6c37589..8a47f5d 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -63,7 +63,7 @@ public class TestLogAppenderWithGrpc
 
     // client and leader setup
     try (final RaftClient client = cluster.createClient(cluster.getGroup())) {
-      client.send(new RaftTestUtil.SimpleMessage("m"));
+      client.io().send(new RaftTestUtil.SimpleMessage("m"));
       RaftServerImpl leader = waitForLeader(cluster);
       long initialNextIndex = leader.getState().getNextIndex();
 
@@ -104,7 +104,7 @@ public class TestLogAppenderWithGrpc
     // Send some messages
     try(RaftClient client = cluster.createClient(leader.getId())) {
       for(int i = 0; i < 10; i++) {
-        final RaftClientReply reply = client.send(new 
RaftTestUtil.SimpleMessage("m" + ++messageCount));
+        final RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("m" + ++messageCount));
         Assert.assertTrue(reply.isSuccess());
       }
     }
@@ -121,7 +121,7 @@ public class TestLogAppenderWithGrpc
     // Send some more messages
     try(RaftClient client = cluster.createClient(leader.getId())) {
       for(int i = 0; i < 10; i++) {
-        final RaftClientReply reply = client.send(new 
RaftTestUtil.SimpleMessage("m" + ++messageCount));
+        final RaftClientReply reply = client.io().send(new 
RaftTestUtil.SimpleMessage("m" + ++messageCount));
         Assert.assertTrue(reply.isSuccess());
       }
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index f8cec9a..4814d25 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -143,7 +143,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
       for(int i = 0; i < messageCount.get(); i++) {
         if (i != truncatedMessageIndex) {
           final Message m = new SimpleMessage("m" + i);
-          final RaftClientReply reply = client.sendReadOnly(m);
+          final RaftClientReply reply = client.io().sendReadOnly(m);
           Assert.assertTrue(reply.isSuccess());
           LOG.info("query {}: {} {}", m, reply, 
LogEntryProto.parseFrom(reply.getMessage().getContent()));
         }
@@ -155,7 +155,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     try(final RaftClient client = cluster.createClient()) {
       // write some messages
       for(int i = 0; i < 10; i++) {
-        Assert.assertTrue(client.send(newMessage.get()).isSuccess());
+        Assert.assertTrue(client.io().send(newMessage.get()).isSuccess());
       }
     }
   }
@@ -239,7 +239,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
       final SimpleMessage m = messages[i];
       new Thread(() -> {
         try (final RaftClient client = cluster.createClient()) {
-          Assert.assertTrue(client.send(m).isSuccess());
+          Assert.assertTrue(client.io().send(m).isSuccess());
         } catch (IOException e) {
           throw new IllegalStateException("Failed to send " + m, e);
         }
@@ -356,11 +356,11 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     final SimpleMessage lastMessage = messages[messages.length - 1];
     try (final RaftClient client = cluster.createClient()) {
       for (SimpleMessage m : messages) {
-        Assert.assertTrue(client.send(m).isSuccess());
+        Assert.assertTrue(client.io().send(m).isSuccess());
       }
 
       // assert that the last message exists
-      Assert.assertTrue(client.sendReadOnly(lastMessage).isSuccess());
+      Assert.assertTrue(client.io().sendReadOnly(lastMessage).isSuccess());
     }
 
     final RaftLog log = leader.getState().getLog();
@@ -382,7 +382,7 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     cluster.restartServer(id, false);
     testFailureCase("last-entry-not-found", () -> {
       try (final RaftClient client = cluster.createClient()) {
-        client.sendReadOnly(lastMessage);
+        client.io().sendReadOnly(lastMessage);
       }
     }, StateMachineException.class, IndexOutOfBoundsException.class);
   }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 737c8a9..dfb8af6 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -30,8 +30,6 @@ import static 
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_ENQUE
 import static 
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_EXECUTION_TIME;
 import static 
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_QUEUE_TIME;
 import static 
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_WORKER_QUEUE_SIZE;
-import static 
org.apache.ratis.server.metrics.RaftLogMetrics.METADATA_LOG_ENTRY_COUNT;
-import static 
org.apache.ratis.server.metrics.RaftLogMetrics.CONFIG_LOG_ENTRY_COUNT;
 import static 
org.apache.ratis.server.metrics.RaftLogMetrics.STATE_MACHINE_LOG_ENTRY_COUNT;
 import static 
org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
 
@@ -104,7 +102,7 @@ public class TestRaftLogMetrics extends BaseTest
 
     try (final RaftClient client = cluster.createClient()) {
       for (RaftTestUtil.SimpleMessage message : messages) {
-        client.send(message);
+        client.io().send(message);
       }
     }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java 
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 6c02e72..180f970 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -144,7 +144,7 @@ public class TestStateMachine extends BaseTest implements 
MiniRaftClusterWithSim
     final RaftTestUtil.SimpleMessage[] messages = 
RaftTestUtil.SimpleMessage.create(numTrx);
     try(final RaftClient client = cluster.createClient()) {
       for (RaftTestUtil.SimpleMessage message : messages) {
-        client.send(message);
+        client.io().send(message);
       }
     }
 

Reply via email to