This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f78403a RATIS-1093. Move blocking methods from RaftClient to a new
BlockingApi interface (#219). Contributed by Rui Wang
f78403a is described below
commit f78403a08700db47af7204da2f9ce46c2febc0ff
Author: Rui Wang <[email protected]>
AuthorDate: Mon Oct 12 22:51:31 2020 -0700
RATIS-1093. Move blocking methods from RaftClient to a new BlockingApi
interface (#219). Contributed by Rui Wang
---
.../java/org/apache/ratis/client/RaftClient.java | 24 ++-------
.../org/apache/ratis/client/api/BlockingApi.java | 48 ++++++++++++++++++
.../org/apache/ratis/client/impl/BlockingImpl.java | 57 ++++++++++++++++++++++
.../apache/ratis/client/impl/RaftClientImpl.java | 29 +++--------
.../ratis/examples/arithmetic/cli/Assign.java | 2 +-
.../apache/ratis/examples/arithmetic/cli/Get.java | 2 +-
.../examples/counter/client/CounterClient.java | 4 +-
.../ratis/examples/filestore/FileStoreClient.java | 4 +-
.../ratis/examples/arithmetic/TestArithmetic.java | 2 +-
.../apache/ratis/examples/counter/TestCounter.java | 12 ++---
.../ratis/logservice/api/LogServiceClient.java | 15 +++---
.../ratis/logservice/impl/LogReaderImpl.java | 8 +--
.../ratis/logservice/impl/LogStreamImpl.java | 10 ++--
.../ratis/logservice/impl/LogWriterImpl.java | 4 +-
.../apache/ratis/logservice/server/LogServer.java | 4 +-
.../ratis/logservice/server/LogStateMachine.java | 6 +--
.../ratis/logservice/server/MetaStateMachine.java | 6 +--
.../ratis/InstallSnapshotNotificationTests.java | 8 +--
.../java/org/apache/ratis/LogAppenderTests.java | 4 +-
.../org/apache/ratis/MessageStreamApiTests.java | 6 +--
.../java/org/apache/ratis/MiniRaftCluster.java | 2 +-
.../org/apache/ratis/RaftAsyncExceptionTests.java | 2 +-
.../test/java/org/apache/ratis/RaftBasicTests.java | 8 +--
.../org/apache/ratis/RaftExceptionBaseTest.java | 10 ++--
.../test/java/org/apache/ratis/RaftTestUtil.java | 2 +-
.../apache/ratis/RequestLimitAsyncBaseTest.java | 4 +-
.../ratis/server/impl/GroupInfoBaseTest.java | 2 +-
.../ratis/server/impl/GroupManagementBaseTest.java | 2 +-
.../ratis/server/impl/LeaderElectionTests.java | 4 +-
.../server/impl/RaftReconfigurationBaseTest.java | 6 +--
.../impl/RaftStateMachineExceptionTests.java | 2 +-
.../server/impl/StateMachineShutdownTests.java | 6 +--
.../ratis/statemachine/RaftSnapshotBaseTest.java | 6 +--
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 6 +--
.../apache/ratis/server/ServerRestartTests.java | 12 ++---
.../ratis/server/raftlog/TestRaftLogMetrics.java | 4 +-
.../ratis/statemachine/TestStateMachine.java | 2 +-
37 files changed, 203 insertions(+), 132 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 7794817..c5decd6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.api.AsyncApi;
+import org.apache.ratis.client.api.BlockingApi;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.client.api.MessageStreamApi;
@@ -29,7 +30,6 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +60,9 @@ public interface RaftClient extends Closeable {
/** @return the {@link MessageStreamApi}. */
MessageStreamApi getMessageStreamApi();
+ /** @return the {@link BlockingApi}. */
+ BlockingApi io();
+
/** @return the {@link DataStreamApi}. */
default DataStreamApi getDataStreamApi() {
// TODO RATIS-1090: Implements this once the streaming feature has become
usable.
@@ -67,25 +70,6 @@ public interface RaftClient extends Closeable {
JavaUtils.getCurrentStackTraceElement().getMethodName() + " is not yet
supported.");
}
- /**
- * Send the given message to the raft service.
- * The message may change the state of the service.
- * For readonly messages, use {@link #sendReadOnly(Message)} instead.
- *
- * @param message The request message.
- * @return the reply.
- */
- RaftClientReply send(Message message) throws IOException;
-
- /** Send the given readonly message to the raft service. */
- RaftClientReply sendReadOnly(Message message) throws IOException;
-
- /** Send the given stale-read message to the given server (not the raft
service). */
- RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId
server) throws IOException;
-
- /** Watch the given index to satisfy the given replication level. */
- RaftClientReply sendWatch(long index, ReplicationLevel replication) throws
IOException;
-
/** Send set configuration request to the raft service. */
RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws
IOException;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
new file mode 100644
index 0000000..dc14cd5
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.api;
+
+import java.io.IOException;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/**
+ * APIs to support blocking operations such as send message, send (stale)read
message and watch request.
+ */
+public interface BlockingApi {
+ /**
+ * Send the given message to the raft service.
+ * The message may change the state of the service.
+ * For readonly messages, use {@link #sendReadOnly(Message)} instead.
+ *
+ * @param message The request message.
+ * @return the reply.
+ */
+ RaftClientReply send(Message message) throws IOException;
+
+ /** Send the given readonly message to the raft service. */
+ RaftClientReply sendReadOnly(Message message) throws IOException;
+
+ /** Send the given stale-read message to the given server (not the raft
service). */
+ RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId
server) throws IOException;
+
+ /** Watch the given index to satisfy the given replication level. */
+ RaftClientReply watch(long index, ReplicationLevel replication) throws
IOException;
+}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
new file mode 100644
index 0000000..d73c227
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import java.io.IOException;
+import java.util.Objects;
+import org.apache.ratis.client.api.BlockingApi;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/** Blocking api implementations. */
+class BlockingImpl implements BlockingApi {
+ private final RaftClientImpl client;
+
+ BlockingImpl(RaftClientImpl client) {
+ this.client = Objects.requireNonNull(client, "client == null");
+ }
+
+ @Override
+ public RaftClientReply send(Message message) throws IOException {
+ return client.send(RaftClientRequest.writeRequestType(), message, null);
+ }
+
+ @Override
+ public RaftClientReply sendReadOnly(Message message) throws IOException {
+ return client.send(RaftClientRequest.readRequestType(), message, null);
+ }
+
+ @Override
+ public RaftClientReply sendStaleRead(Message message, long minIndex,
RaftPeerId server)
+ throws IOException {
+ return client.send(RaftClientRequest.staleReadRequestType(minIndex),
message, server);
+ }
+
+ @Override
+ public RaftClientReply watch(long index, ReplicationLevel replication)
throws IOException {
+ return client.send(RaftClientRequest.watchRequestType(index, replication),
null, null);
+ }
+}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 25b4eab..9d096ce 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -25,7 +25,6 @@ import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.api.MessageStreamApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
@@ -207,28 +206,7 @@ public final class RaftClientImpl implements RaftClient {
callId, message, type, slidingWindowEntry);
}
- @Override
- public RaftClientReply send(Message message) throws IOException {
- return send(RaftClientRequest.writeRequestType(), message, null);
- }
-
- @Override
- public RaftClientReply sendReadOnly(Message message) throws IOException {
- return send(RaftClientRequest.readRequestType(), message, null);
- }
-
- @Override
- public RaftClientReply sendStaleRead(Message message, long minIndex,
RaftPeerId server)
- throws IOException {
- return send(RaftClientRequest.staleReadRequestType(minIndex), message,
server);
- }
-
- @Override
- public RaftClientReply sendWatch(long index, ReplicationLevel replication)
throws IOException {
- return send(RaftClientRequest.watchRequestType(index, replication), null,
null);
- }
-
- private RaftClientReply send(RaftClientRequest.Type type, Message message,
RaftPeerId server)
+ RaftClientReply send(RaftClientRequest.Type type, Message message,
RaftPeerId server)
throws IOException {
if (!type.is(TypeCase.WATCH)) {
Objects.requireNonNull(message, "message == null");
@@ -261,6 +239,11 @@ public final class RaftClientImpl implements RaftClient {
return new GroupManagementImpl(server, this);
}
+ @Override
+ public BlockingImpl io() {
+ return new BlockingImpl(this);
+ }
+
void addServers(Stream<RaftPeer> peersInNewConf) {
clientRpc.addServers(
peersInNewConf.filter(p -> !peers.contains(p))::iterator);
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
index 3ae8cd6..3c6d774 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Assign.java
@@ -56,7 +56,7 @@ public class Assign extends Client {
@Override
protected void operation(RaftClient client) throws IOException {
- RaftClientReply send = client.send(
+ RaftClientReply send = client.io().send(
new AssignmentMessage(new Variable(name), createExpression(value)));
System.out.println("Success: " + send.isSuccess());
System.out.println("Response: " + send.getMessage().getClass());
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
index c274334..89050f1 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/cli/Get.java
@@ -40,7 +40,7 @@ public class Get extends Client {
@Override
protected void operation(RaftClient client) throws IOException {
RaftClientReply getValue =
- client.sendReadOnly(Expression.Utils.toMessage(new Variable(name)));
+ client.io().sendReadOnly(Expression.Utils.toMessage(new
Variable(name)));
Expression response =
Expression.Utils.bytes2Expression(getValue.getMessage().getContent().toByteArray(),
0);
System.out.println(String.format("%s=%s", name, (DoubleValue)
response).toString());
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
index faa0cd8..eafdb03 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/client/CounterClient.java
@@ -62,7 +62,7 @@ public final class CounterClient {
System.out.printf("Sending %d increment command...\n", increment);
for (int i = 0; i < increment; i++) {
executorService.submit(() ->
- raftClient.send(Message.valueOf("INCREMENT")));
+ raftClient.io().send(Message.valueOf("INCREMENT")));
}
//shutdown the executor service and wait until they finish their work
@@ -70,7 +70,7 @@ public final class CounterClient {
executorService.awaitTermination(increment * 500L, TimeUnit.MILLISECONDS);
//send GET command and print the response
- RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET"));
+ RaftClientReply count =
raftClient.io().sendReadOnly(Message.valueOf("GET"));
String response =
count.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println(response);
}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index f6269bb..8ce4487 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -95,11 +95,11 @@ public class FileStoreClient implements Closeable {
}
private ByteString send(ByteString request) throws IOException {
- return send(request, client::send);
+ return send(request, client.io()::send);
}
private ByteString sendReadOnly(ByteString request) throws IOException {
- return send(request, client::sendReadOnly);
+ return send(request, client.io()::sendReadOnly);
}
private CompletableFuture<ByteString> sendAsync(ByteString request) {
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
index f2cedce..da990fa 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
@@ -153,7 +153,7 @@ public class TestArithmetic extends ParameterizedBaseTest {
}
static Expression assign(RaftClient client, Variable x, Expression e, Double
expected) throws IOException {
- final RaftClientReply r = client.send(x.assign(e));
+ final RaftClientReply r = client.io().send(x.assign(e));
return assertRaftClientReply(r, expected);
}
diff --git
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
index beb71be..d0aa86e 100644
---
a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
+++
b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java
@@ -47,21 +47,21 @@ public class TestCounter extends ParameterizedBaseTest {
setAndStart(cluster);
try (final RaftClient client = cluster.createClient()) {
for (int i = 0; i < 10; i++) {
- client.send(Message.valueOf("INCREMENT"));
+ client.io().send(Message.valueOf("INCREMENT"));
}
- RaftClientReply reply1 = client.sendReadOnly(Message.valueOf("GET"));
+ RaftClientReply reply1 =
client.io().sendReadOnly(Message.valueOf("GET"));
Assert.assertEquals("10",
reply1.getMessage().getContent().toString(Charset.defaultCharset()));
for (int i = 0; i < 10; i++) {
- client.send(Message.valueOf("INCREMENT"));
+ client.io().send(Message.valueOf("INCREMENT"));
}
- RaftClientReply reply2 = client.sendReadOnly(Message.valueOf("GET"));
+ RaftClientReply reply2 =
client.io().sendReadOnly(Message.valueOf("GET"));
Assert.assertEquals("20",
reply2.getMessage().getContent().toString(Charset.defaultCharset()));
for (int i = 0; i < 10; i++) {
- client.send(Message.valueOf("INCREMENT"));
+ client.io().send(Message.valueOf("INCREMENT"));
}
- RaftClientReply reply3 = client.sendReadOnly(Message.valueOf("GET"));
+ RaftClientReply reply3 =
client.io().sendReadOnly(Message.valueOf("GET"));
Assert.assertEquals("30",
reply3.getMessage().getContent().toString(Charset.defaultCharset()));
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
index dd25967..559c263 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogServiceClient.java
@@ -93,7 +93,7 @@ public class LogServiceClient implements AutoCloseable {
* @throws IOException
*/
public LogStream createLog(LogName logName) throws IOException {
- RaftClientReply reply = client.sendReadOnly(
+ RaftClientReply reply = client.io().sendReadOnly(
() ->
MetaServiceProtoUtil.toCreateLogRequestProto(logName).toByteString());
CreateLogReplyProto message =
CreateLogReplyProto.parseFrom(reply.getMessage().getContent());
@@ -138,7 +138,7 @@ public class LogServiceClient implements AutoCloseable {
public List<ArchivalInfo> getExportStatus(LogName logName) throws
IOException {
try (RaftClient client = getRaftClient(getLogInfo(logName))) {
- RaftClientReply exportInfoReply = client.sendReadOnly(
+ RaftClientReply exportInfoReply = client.io().sendReadOnly(
() ->
LogServiceProtoUtil.toExportInfoRequestProto(logName).toByteString());
LogServiceProtos.GetExportInfoReplyProto message =
LogServiceProtos.GetExportInfoReplyProto
@@ -153,7 +153,7 @@ public class LogServiceClient implements AutoCloseable {
}
public void deleteLog(LogName logName) throws IOException {
- RaftClientReply reply = client.sendReadOnly
+ RaftClientReply reply = client.io().sendReadOnly
(() ->
MetaServiceProtoUtil.toDeleteLogRequestProto(logName).toByteString());
DeleteLogReplyProto message =
DeleteLogReplyProto.parseFrom(reply.getMessage().getContent());
if(message.hasException()) {
@@ -167,7 +167,7 @@ public class LogServiceClient implements AutoCloseable {
* @throws IOException
*/
public List<LogInfo> listLogs() throws IOException {
- RaftClientReply reply = client.sendReadOnly
+ RaftClientReply reply = client.io().sendReadOnly
(() ->
MetaServiceProtoUtil.toListLogRequestProto().toByteString());
ListLogsReplyProto message =
ListLogsReplyProto.parseFrom(reply.getMessage().getContent());
List<LogInfoProto> infoProtos = message.getLogsList();
@@ -197,7 +197,7 @@ public class LogServiceClient implements AutoCloseable {
}
private LogInfo getLogInfo(LogName logName) throws IOException {
- RaftClientReply reply = client.sendReadOnly(
+ RaftClientReply reply = client.io().sendReadOnly(
() ->
MetaServiceProtoUtil.toGetLogRequestProto(logName).toByteString());
GetLogReplyProto message =
GetLogReplyProto.parseFrom(reply.getMessage().getContent());
if (message.hasException()) {
@@ -234,7 +234,8 @@ public class LogServiceClient implements AutoCloseable {
*/
public void exportLog(LogName logName, String location, long recordId)
throws IOException {
try (RaftClient client = getRaftClient(getLogInfo(logName))) {
- RaftClientReply archiveLogReply = client.sendReadOnly(() ->
LogServiceProtoUtil
+ RaftClientReply archiveLogReply =
+ client.io().sendReadOnly(() -> LogServiceProtoUtil
.toArchiveLogRequestProto(logName, location, recordId,
location == null ? true : false,
ArchivalInfo.ArchivalStatus.SUBMITTED)
.toByteString());
@@ -256,7 +257,7 @@ public class LogServiceClient implements AutoCloseable {
// TODO this name sucks, confusion WRT the Java Closeable interface.
public void closeLog(LogName name) throws IOException {
try (RaftClient client = getRaftClient(getLogInfo(name))) {
- RaftClientReply reply = client.send(
+ RaftClientReply reply = client.io().send(
() -> LogServiceProtoUtil.toChangeStateRequestProto(name,
State.CLOSED)
.toByteString());
LogServiceProtos.ChangeStateReplyProto message =
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
index c43e916..706f65e 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogReaderImpl.java
@@ -78,7 +78,7 @@ public class LogReaderImpl implements LogReader {
try {
RaftClientReply reply =
raftClient
- .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toReadLogRequestProto(parent.getName(), currentRecordId,
1).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
@@ -108,7 +108,7 @@ public class LogReaderImpl implements LogReader {
Preconditions.checkNotNull(buffer, "buffer is NULL" );
try {
- RaftClientReply reply =
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ RaftClientReply reply =
raftClient.io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toReadLogRequestProto(parent.getName(), currentRecordId,
1).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
@@ -135,7 +135,7 @@ public class LogReaderImpl implements LogReader {
try {
RaftClientReply reply = raftClient
- .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toReadLogRequestProto(parent.getName(), currentRecordId,
numRecords).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
@@ -166,7 +166,7 @@ public class LogReaderImpl implements LogReader {
Preconditions.checkArgument(buffers.length > 0, "list of buffers is
empty");
try {
- RaftClientReply reply =
raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ RaftClientReply reply =
raftClient.io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toReadLogRequestProto(parent.getName(), currentRecordId,
buffers.length).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index f029fb3..8eff2e1 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -98,7 +98,7 @@ public class LogStreamImpl implements LogStream {
}
@Override public State getState() throws IOException {
- RaftClientReply reply = raftClient.sendReadOnly(
+ RaftClientReply reply = raftClient.io().sendReadOnly(
Message.valueOf(LogServiceProtoUtil.toGetStateRequestProto(name).toByteString()));
LogServiceProtos.GetStateReplyProto proto =
LogServiceProtos.GetStateReplyProto.parseFrom(reply.getMessage().getContent());
@@ -108,7 +108,7 @@ public class LogStreamImpl implements LogStream {
@Override
public long getSize() throws IOException{
RaftClientReply reply = raftClient
- .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toGetSizeRequestProto(name).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
@@ -126,7 +126,7 @@ public class LogStreamImpl implements LogStream {
@Override
public long getLength() throws IOException {
RaftClientReply reply = raftClient
- .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toGetLengthRequestProto(name).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
@@ -155,7 +155,7 @@ public class LogStreamImpl implements LogStream {
public long getLastRecordId() throws IOException {
try {
RaftClientReply reply = raftClient
- .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toGetLastCommittedIndexRequestProto(name).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
@@ -177,7 +177,7 @@ public class LogStreamImpl implements LogStream {
public long getStartRecordId() throws IOException {
try {
RaftClientReply reply = raftClient
- .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .io().sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toGetStartIndexProto(name).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
index e292739..39f1a43 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogWriterImpl.java
@@ -67,7 +67,7 @@ public class LogWriterImpl implements LogWriter {
@Override
public List<Long> write(List<ByteBuffer> list) throws IOException {
try {
- RaftClientReply reply = raftClient.send(
+ RaftClientReply reply = raftClient.io().send(
Message.valueOf(LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(),
list).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
@@ -89,7 +89,7 @@ public class LogWriterImpl implements LogWriter {
@Override
public long sync() throws IOException {
try {
- RaftClientReply reply = raftClient.send(Message
+ RaftClientReply reply = raftClient.io().send(Message
.valueOf(LogServiceProtoUtil.toSyncLogRequestProto(parent.getName()).toByteString()));
if (reply.getException() != null) {
throw new IOException(reply.getException());
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index b68a763..f1d9cf9 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -145,7 +145,7 @@ public class LogServer extends BaseServer {
.setClientId(ClientId.randomId())
.setProperties(properties)
.build();
- metaClient.send(() ->
MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
+ metaClient.io().send(() ->
MetaServiceProtoUtil.toPingRequestProto(peer).toByteString());
daemon = new Daemon(new HeartbeatSender(new
RaftPeer(raftServer.getId())),
"heartbeat-Sender"+raftServer.getId());
daemon.start();
@@ -200,7 +200,7 @@ public class LogServer extends BaseServer {
while (true) {
try {
- metaClient.send(() -> MetaServiceProtoUtil.
+ metaClient.io().send(() -> MetaServiceProtoUtil.
toHeartbeatRequestProto(peer).toByteString());
Thread.sleep(heartbeatInterval);
} catch (InterruptedException e) {
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index f49f9dd..45ac3d0 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -722,7 +722,7 @@ public class LogStateMachine extends BaseStateMachine {
private void sendArchiveLogrequestToNewLeader(long recordId, LogName
logName, String location)
throws IOException {
- getClient().sendReadOnly(() -> LogServiceProtoUtil
+ getClient().io().sendReadOnly(() -> LogServiceProtoUtil
.toArchiveLogRequestProto(logName, location, recordId,
isArchivalRequest,
ArchivalStatus.INTERRUPTED).toByteString());
}
@@ -744,7 +744,7 @@ public class LogStateMachine extends BaseStateMachine {
private void updateArchivingInfo(long recordId, LogName logName, String
location,
boolean isArchival, ArchivalStatus status)
throws IOException {
- RaftClientReply archiveLogReply = getClient().send(() ->
LogServiceProtoUtil
+ RaftClientReply archiveLogReply = getClient().io().send(() ->
LogServiceProtoUtil
.toArchiveLogRequestProto(logName, location, recordId, isArchival,
status).toByteString());
LogServiceProtos.ArchiveLogReplyProto
message=LogServiceProtos.ArchiveLogReplyProto
.parseFrom(archiveLogReply.getMessage().getContent());
@@ -754,7 +754,7 @@ public class LogStateMachine extends BaseStateMachine {
}
private void sendChangeStateRequest(State st, boolean force) throws
IOException {
- getClient().send(
+ getClient().io().send(
() ->
LogServiceProtoUtil.toChangeStateRequestProto(LogName.of("Dummy"), st, force)
.toByteString());
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 2982595..6988d1a 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -270,7 +270,7 @@ public class MetaStateMachine extends BaseStateMachine {
});
try (RaftClient client =
RaftClient.newBuilder().setRaftGroup(currentGroup)
.setClientId(ClientId.randomId()).setProperties(properties).build()){
- client.send(() ->
MetaServiceProtos.MetaSMRequestProto.newBuilder()
+ client.io().send(() ->
MetaServiceProtos.MetaSMRequestProto.newBuilder()
.setUnregisterRequest(
LogServiceUnregisterLogRequestProto.newBuilder()
.setLogname(LogServiceProtoUtil.toLogNameProto(logName)))
@@ -352,7 +352,7 @@ public class MetaStateMachine extends BaseStateMachine {
}
try (RaftClient client =
RaftClient.newBuilder().setRaftGroup(currentGroup)
.setClientId(ClientId.randomId()).setProperties(properties).build()){
- client.send(() ->
MetaServiceProtos.MetaSMRequestProto.newBuilder()
+ client.io().send(() ->
MetaServiceProtos.MetaSMRequestProto.newBuilder()
.setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
.setLogname(LogServiceProtoUtil.toLogNameProto(name))
.setRaftGroup(MetaServiceProtoUtil
@@ -451,7 +451,7 @@ public class MetaStateMachine extends BaseStateMachine {
LOG.warn(String.format("Peer %s in the
group %s went down." +
" Hence closing the
log %s serve by the group.",
raftPeer.toString(),
group.toString(), logName.toString()));
- RaftClientReply reply = client.send(
+ RaftClientReply reply =
client.io().send(
() -> LogServiceProtoUtil.
toChangeStateRequestProto(logName, LogStream.State.CLOSED, true)
.toByteString());
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 9cf4652..c41a284 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -123,7 +123,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
try(final RaftClient client = cluster.createClient(leaderId)) {
for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
RaftClientReply
- reply = client.send(new RaftTestUtil.SimpleMessage("m" + i));
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("m" +
i));
Assert.assertTrue(reply.isSuccess());
}
}
@@ -159,7 +159,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
// generate some more traffic
try(final RaftClient client =
cluster.createClient(cluster.getLeader().getId())) {
- Assert.assertTrue(client.send(new RaftTestUtil.SimpleMessage("m" +
i)).isSuccess());
+ Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m"
+ i)).isSuccess());
}
final SnapshotInfo leaderSnapshotInfo =
cluster.getLeader().getStateMachine().getLatestSnapshot();
@@ -203,7 +203,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
try (final RaftClient client = cluster.createClient(leaderId)) {
for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- final RaftClientReply reply = client.send(new
RaftTestUtil.SimpleMessage("m" + i));
+ final RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
@@ -225,7 +225,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
// generate some more traffic
try (final RaftClient client = cluster.createClient(leader.getId())) {
- Assert.assertTrue(client.send(new RaftTestUtil.SimpleMessage("m" +
i)).isSuccess());
+ Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" +
i)).isSuccess());
}
FIVE_SECONDS.sleep();
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 2f87cd8..8a40009 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -101,7 +101,7 @@ public abstract class LogAppenderTests<CLUSTER extends
MiniRaftCluster>
try {
latch.await();
for (SimpleMessage msg : messages) {
- client.send(msg);
+ client.io().send(msg);
}
client.close();
succeed.set(true);
@@ -134,7 +134,7 @@ public abstract class LogAppenderTests<CLUSTER extends
MiniRaftCluster>
// Write 10 messages to leader.
try(RaftClient client = cluster.createClient(leaderServer.getId())) {
for (int i = 1; i <= 10; i++) {
- client.send(new RaftTestUtil.SimpleMessage("Msg to make leader ready "
+ i));
+ client.io().send(new RaftTestUtil.SimpleMessage("Msg to make leader
ready " + i));
}
} catch (IOException e) {
throw e;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
index 5da5c91..f49564c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
@@ -73,11 +73,11 @@ public abstract class MessageStreamApiTests<CLUSTER extends
MiniRaftCluster> ext
final String k = key.toString();
try(RaftClient client = cluster.createClient()) {
final String k1 = k.substring(0, endOfRequest);
- final RaftClientReply r1= client.sendReadOnly(new SimpleMessage(k1));
+ final RaftClientReply r1= client.io().sendReadOnly(new
SimpleMessage(k1));
Assert.assertTrue(r1.isSuccess());
final String k2 = k.substring(endOfRequest);
- final RaftClientReply r2 = client.sendReadOnly(new SimpleMessage(k2));
+ final RaftClientReply r2 = client.io().sendReadOnly(new
SimpleMessage(k2));
Assert.assertTrue(r2.isSuccess());
}
}
@@ -119,7 +119,7 @@ public abstract class MessageStreamApiTests<CLUSTER extends
MiniRaftCluster> ext
// check if all the parts are streamed as a single message.
try(RaftClient client = cluster.createClient()) {
- final RaftClientReply reply = client.sendReadOnly(new
SimpleMessage(bytes.toString(StandardCharsets.UTF_8)));
+ final RaftClientReply reply = client.io().sendReadOnly(new
SimpleMessage(bytes.toString(StandardCharsets.UTF_8)));
Assert.assertTrue(reply.isSuccess());
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 2b7a19f..fb6050a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -492,7 +492,7 @@ public abstract class MiniRaftCluster implements Closeable {
public RaftServerImpl getLeaderAndSendFirstMessage(boolean ignoreException)
throws IOException {
final RaftServerImpl leader = getLeader();
try(RaftClient client = createClient(leader.getId())) {
- client.send(new RaftTestUtil.SimpleMessage("first msg to make leader
ready"));
+ client.io().send(new RaftTestUtil.SimpleMessage("first msg to make
leader ready"));
} catch (IOException e) {
if (!ignoreException) {
throw e;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index 23b63fe..2a47164 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -99,7 +99,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER extends
MiniRaftCluster>
private void runTestTimeoutException(CLUSTER cluster) throws Exception {
// send a message to make sure the cluster is working
try(RaftClient client = cluster.createClient()) {
- final RaftClientReply reply = client.send(new SimpleMessage("m0"));
+ final RaftClientReply reply = client.io().send(new SimpleMessage("m0"));
Assert.assertTrue(reply.isSuccess());
RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(), ONE_SECOND);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 7e1e47e..48d310b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -147,7 +147,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
}
});
} else {
- final RaftClientReply reply = client.send(message);
+ final RaftClientReply reply = client.io().send(message);
Assert.assertTrue(reply.isSuccess());
}
}
@@ -288,7 +288,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
for (int i = 0; i < messages.length; i++) {
if (!useAsync) {
final RaftClientReply reply =
- client.send(messages[step.getAndIncrement()]);
+ client.io().send(messages[step.getAndIncrement()]);
Assert.assertTrue(reply.isSuccess());
} else {
final CompletableFuture<RaftClientReply> replyFuture =
@@ -436,7 +436,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
CompletableFuture<RaftClientReply> replyFuture =
client.async().send(new SimpleMessage("abc"));
replyFuture.get();
} else {
- client.send(new SimpleMessage("abc"));
+ client.io().send(new SimpleMessage("abc"));
}
// Eventually the request would be accepted by the server
// when the retry cache entry is invalidated.
@@ -467,7 +467,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
CompletableFuture<RaftClientReply> replyFuture =
client.async().send(new SimpleMessage("abc"));
replyFuture.get();
} else {
- client.send(new SimpleMessage("abc"));
+ client.io().send(new SimpleMessage("abc"));
}
long appliedIndexAfter = (Long) appliedIndexGauge.getValue();
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 8b6fd2d..bebf940 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -91,7 +91,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends
MiniRaftCluster>
}
static void sendMessage(String message, RaftClient client) throws
IOException {
- final RaftClientReply reply = client.send(new SimpleMessage(message));
+ final RaftClientReply reply = client.io().send(new SimpleMessage(message));
Assert.assertTrue(reply.isSuccess());
}
@@ -149,11 +149,11 @@ public abstract class RaftExceptionBaseTest<CLUSTER
extends MiniRaftCluster>
// Create client using another group
try(RaftClient client = cluster.createClient(anotherGroup)) {
testFailureCase("send(..) with client group being different from the
server group",
- () -> client.send(Message.EMPTY),
+ () -> client.io().send(Message.EMPTY),
GroupMismatchException.class);
testFailureCase("sendReadOnly(..) with client group being different from
the server group",
- () -> client.sendReadOnly(Message.EMPTY),
+ () -> client.io().sendReadOnly(Message.EMPTY),
GroupMismatchException.class);
testFailureCase("setConfiguration(..) with client group being different
from the server group",
@@ -177,7 +177,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends
MiniRaftCluster>
try (RaftClient client = cluster.createClient()) {
final RaftPeerId follower =
cluster.getFollowers().iterator().next().getId();
testFailureCase("sendStaleRead(..) with a large commit index",
- () -> client.sendStaleRead(Message.EMPTY, 1_000_000_000L, follower),
+ () -> client.io().sendStaleRead(Message.EMPTY, 1_000_000_000L,
follower),
StateMachineException.class, StaleReadException.class);
}
}
@@ -194,7 +194,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends
MiniRaftCluster>
SimpleMessage msg = new SimpleMessage(new String(bytes));
try (RaftClient client = cluster.createClient(leaderId)) {
testFailureCase("testLogAppenderBufferCapacity",
- () -> client.send(msg),
+ () -> client.io().send(msg),
StateMachineException.class, RaftLogIOException.class);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index f743bc9..05435ca 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -456,7 +456,7 @@ public interface RaftTestUtil {
Thread t = new Thread(() -> {
try (final RaftClient client = cluster.createClient(leaderId)) {
for (SimpleMessage mssg: messages) {
- client.send(mssg);
+ client.io().send(mssg);
}
} catch (Exception e) {
e.printStackTrace();
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
index df7ba15..a1084f8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
@@ -99,14 +99,14 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER
extends MiniRaftCluster>
try(RaftClient c2 = cluster.createClient(leader.getId(),
RetryPolicies.noRetry())) {
// more write requests should get ResourceUnavailableException
final SimpleMessage message = new SimpleMessage("err");
- testFailureCase("send should fail", () -> c2.send(message),
+ testFailureCase("send should fail", () -> c2.io().send(message),
ResourceUnavailableException.class);
testFailureCase("sendAsync should fail", () ->
c2.async().send(message).get(),
ExecutionException.class, ResourceUnavailableException.class);
// more watch requests should get ResourceUnavailableException
final long watchIndex = watchBase + watchElementLimit;
- testFailureCase("sendWatch should fail", () ->
c2.sendWatch(watchIndex, ReplicationLevel.ALL),
+ testFailureCase("sendWatch should fail", () ->
c2.io().watch(watchIndex, ReplicationLevel.ALL),
ResourceUnavailableException.class);
testFailureCase("sendWatchAsync should fail", () ->
c2.async().watch(watchIndex, ReplicationLevel.ALL).get(),
ExecutionException.class, ResourceUnavailableException.class);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
index bde28a7..0e563eb 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupInfoBaseTest.java
@@ -121,7 +121,7 @@ public abstract class GroupInfoBaseTest<CLUSTER extends
MiniRaftCluster>
RaftClientReply reply = null;
try(final RaftClient client = cluster.createClient()) {
for(int i = 0; i < n; i++) {
- reply = client.send(Message.valueOf("m" + i));
+ reply = client.io().send(Message.valueOf("m" + i));
}
}
return reply;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index f8ebf4f..3c2de57 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -143,7 +143,7 @@ public abstract class GroupManagementBaseTest extends
BaseTest {
// when suggested leader rejoin cluster, it will catch up log first.
try (final RaftClient client = cluster.createClient(newGroup)) {
for (int i = 0; i < 10; i ++) {
- RaftClientReply reply = client.send(new RaftTestUtil.SimpleMessage("m"
+ i));
+ RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 1ccbc17..844d9cc 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -159,10 +159,10 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
final RaftServerImpl leader = waitForLeader(cluster);
try (RaftClient client = cluster.createClient(leader.getId())) {
- client.send(new RaftTestUtil.SimpleMessage("message"));
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
Thread.sleep(1000);
isolate(cluster, leader.getId());
- RaftClientReply reply = client.send(new
RaftTestUtil.SimpleMessage("message"));
+ RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("message"));
Assert.assertNotEquals(reply.getReplierId(),
leader.getId().toString());
Assert.assertTrue(reply.isSuccess());
} finally {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index ae38622..b54fbdc 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -196,7 +196,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
// submit some msgs before reconf
for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+ RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
@@ -326,7 +326,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
// submit some msgs before reconf
for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+ RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
@@ -465,7 +465,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
void runTestNoChangeRequest(CLUSTER cluster) throws Exception {
final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
try(final RaftClient client = cluster.createClient(leader.getId())) {
- client.send(new SimpleMessage("m"));
+ client.io().send(new SimpleMessage("m"));
final RaftLog leaderLog = leader.getState().getLog();
final long committedIndex = leaderLog.getLastCommittedIndex();
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index e6e5c2b..7406c44 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -87,7 +87,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER
extends MiniRaftClu
RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
try(final RaftClient client = cluster.createClient(leaderId)) {
- client.send(new RaftTestUtil.SimpleMessage("m"));
+ client.io().send(new RaftTestUtil.SimpleMessage("m"));
fail("Exception expected");
} catch (StateMachineException e) {
e.printStackTrace();
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 112f118..a6ffb30 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -90,13 +90,13 @@ public abstract class StateMachineShutdownTests<CLUSTER
extends MiniRaftCluster>
cluster.getLeaderAndSendFirstMessage(true);
try (final RaftClient client = cluster.createClient(leaderId)) {
- client.send(new RaftTestUtil.SimpleMessage("message"));
- RaftClientReply reply = client.send(
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ RaftClientReply reply = client.io().send(
new RaftTestUtil.SimpleMessage("message2"));
long logIndex = reply.getLogIndex();
//Confirm that followers have committed
- RaftClientReply watchReply = client.sendWatch(
+ RaftClientReply watchReply = client.io().watch(
logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
watchReply.getCommitInfos().forEach(
val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index e9b90a2..1945e91 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -145,7 +145,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest
{
int i = 0;
try(final RaftClient client = cluster.createClient(leaderId)) {
for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+ RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
@@ -192,7 +192,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest
{
try(final RaftClient client = cluster.createClient(leaderId)) {
for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+ RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
@@ -227,7 +227,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest
{
// generate some more traffic
try(final RaftClient client =
cluster.createClient(cluster.getLeader().getId())) {
- Assert.assertTrue(client.send(new SimpleMessage("m" + i)).isSuccess());
+ Assert.assertTrue(client.io().send(new SimpleMessage("m" +
i)).isSuccess());
}
// add two more peers
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 6c37589..8a47f5d 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -63,7 +63,7 @@ public class TestLogAppenderWithGrpc
// client and leader setup
try (final RaftClient client = cluster.createClient(cluster.getGroup())) {
- client.send(new RaftTestUtil.SimpleMessage("m"));
+ client.io().send(new RaftTestUtil.SimpleMessage("m"));
RaftServerImpl leader = waitForLeader(cluster);
long initialNextIndex = leader.getState().getNextIndex();
@@ -104,7 +104,7 @@ public class TestLogAppenderWithGrpc
// Send some messages
try(RaftClient client = cluster.createClient(leader.getId())) {
for(int i = 0; i < 10; i++) {
- final RaftClientReply reply = client.send(new
RaftTestUtil.SimpleMessage("m" + ++messageCount));
+ final RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + ++messageCount));
Assert.assertTrue(reply.isSuccess());
}
}
@@ -121,7 +121,7 @@ public class TestLogAppenderWithGrpc
// Send some more messages
try(RaftClient client = cluster.createClient(leader.getId())) {
for(int i = 0; i < 10; i++) {
- final RaftClientReply reply = client.send(new
RaftTestUtil.SimpleMessage("m" + ++messageCount));
+ final RaftClientReply reply = client.io().send(new
RaftTestUtil.SimpleMessage("m" + ++messageCount));
Assert.assertTrue(reply.isSuccess());
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index f8cec9a..4814d25 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -143,7 +143,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
for(int i = 0; i < messageCount.get(); i++) {
if (i != truncatedMessageIndex) {
final Message m = new SimpleMessage("m" + i);
- final RaftClientReply reply = client.sendReadOnly(m);
+ final RaftClientReply reply = client.io().sendReadOnly(m);
Assert.assertTrue(reply.isSuccess());
LOG.info("query {}: {} {}", m, reply,
LogEntryProto.parseFrom(reply.getMessage().getContent()));
}
@@ -155,7 +155,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
try(final RaftClient client = cluster.createClient()) {
// write some messages
for(int i = 0; i < 10; i++) {
- Assert.assertTrue(client.send(newMessage.get()).isSuccess());
+ Assert.assertTrue(client.io().send(newMessage.get()).isSuccess());
}
}
}
@@ -239,7 +239,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
final SimpleMessage m = messages[i];
new Thread(() -> {
try (final RaftClient client = cluster.createClient()) {
- Assert.assertTrue(client.send(m).isSuccess());
+ Assert.assertTrue(client.io().send(m).isSuccess());
} catch (IOException e) {
throw new IllegalStateException("Failed to send " + m, e);
}
@@ -356,11 +356,11 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
final SimpleMessage lastMessage = messages[messages.length - 1];
try (final RaftClient client = cluster.createClient()) {
for (SimpleMessage m : messages) {
- Assert.assertTrue(client.send(m).isSuccess());
+ Assert.assertTrue(client.io().send(m).isSuccess());
}
// assert that the last message exists
- Assert.assertTrue(client.sendReadOnly(lastMessage).isSuccess());
+ Assert.assertTrue(client.io().sendReadOnly(lastMessage).isSuccess());
}
final RaftLog log = leader.getState().getLog();
@@ -382,7 +382,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
cluster.restartServer(id, false);
testFailureCase("last-entry-not-found", () -> {
try (final RaftClient client = cluster.createClient()) {
- client.sendReadOnly(lastMessage);
+ client.io().sendReadOnly(lastMessage);
}
}, StateMachineException.class, IndexOutOfBoundsException.class);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 737c8a9..dfb8af6 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -30,8 +30,6 @@ import static
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_ENQUE
import static
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_EXECUTION_TIME;
import static
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_QUEUE_TIME;
import static
org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_WORKER_QUEUE_SIZE;
-import static
org.apache.ratis.server.metrics.RaftLogMetrics.METADATA_LOG_ENTRY_COUNT;
-import static
org.apache.ratis.server.metrics.RaftLogMetrics.CONFIG_LOG_ENTRY_COUNT;
import static
org.apache.ratis.server.metrics.RaftLogMetrics.STATE_MACHINE_LOG_ENTRY_COUNT;
import static
org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
@@ -104,7 +102,7 @@ public class TestRaftLogMetrics extends BaseTest
try (final RaftClient client = cluster.createClient()) {
for (RaftTestUtil.SimpleMessage message : messages) {
- client.send(message);
+ client.io().send(message);
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 6c02e72..180f970 100644
---
a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++
b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -144,7 +144,7 @@ public class TestStateMachine extends BaseTest implements
MiniRaftClusterWithSim
final RaftTestUtil.SimpleMessage[] messages =
RaftTestUtil.SimpleMessage.create(numTrx);
try(final RaftClient client = cluster.createClient()) {
for (RaftTestUtil.SimpleMessage message : messages) {
- client.send(message);
+ client.io().send(message);
}
}