This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 449a272 RATIS-1094. Move async methods from RaftClient to a new
AsyncApi interface (#220). Contributed by Rui Wang
449a272 is described below
commit 449a272debc1a3aebea6ecbd40ac6356d23ba855
Author: Rui Wang <[email protected]>
AuthorDate: Mon Oct 12 21:08:17 2020 -0700
RATIS-1094. Move async methods from RaftClient to a new AsyncApi interface
(#220). Contributed by Rui Wang
---
.../java/org/apache/ratis/client/RaftClient.java | 24 ++--------
.../java/org/apache/ratis/client/api/AsyncApi.java | 48 +++++++++++++++++++
.../org/apache/ratis/client/impl/AsyncImpl.java | 56 ++++++++++++++++++++++
.../apache/ratis/client/impl/RaftClientImpl.java | 28 +++--------
.../apache/ratis/client/impl/RaftOutputStream.java | 4 +-
.../ratis/examples/filestore/FileStoreClient.java | 4 +-
.../org/apache/ratis/RaftAsyncExceptionTests.java | 6 +--
.../test/java/org/apache/ratis/RaftAsyncTests.java | 28 +++++------
.../test/java/org/apache/ratis/RaftBasicTests.java | 8 ++--
.../apache/ratis/RequestLimitAsyncBaseTest.java | 10 ++--
.../java/org/apache/ratis/WatchRequestTests.java | 17 +++----
.../apache/ratis/grpc/TestGrpcMessageMetrics.java | 3 +-
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 2 +-
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 20 ++++----
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 2 +-
.../ratis/retry/TestExceptionDependentRetry.java | 4 +-
16 files changed, 170 insertions(+), 94 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 15549a8..7794817 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -18,6 +18,7 @@
package org.apache.ratis.client;
import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.client.api.MessageStreamApi;
@@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
/** A client who sends requests to a raft service. */
public interface RaftClient extends Closeable {
@@ -54,6 +54,9 @@ public interface RaftClient extends Closeable {
/** Get the {@link GroupManagementApi} for the given server. */
GroupManagementApi getGroupManagementApi(RaftPeerId server);
+ /** Get the {@link AsyncApi}. */
+ AsyncApi async();
+
/** @return the {@link MessageStreamApi}. */
MessageStreamApi getMessageStreamApi();
@@ -65,25 +68,6 @@ public interface RaftClient extends Closeable {
}
/**
- * Async call to send the given message to the raft service.
- * The message may change the state of the service.
- * For readonly messages, use {@link #sendReadOnlyAsync(Message)} instead.
- *
- * @param message The request message.
- * @return a future of the reply.
- */
- CompletableFuture<RaftClientReply> sendAsync(Message message);
-
- /** Async call to send the given readonly message to the raft service. */
- CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message);
-
- /** Async call to send the given stale-read message to the given server (not
the raft service). */
- CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long
minIndex, RaftPeerId server);
-
- /** Async call to watch the given index to satisfy the given replication
level. */
- CompletableFuture<RaftClientReply> sendWatchAsync(long index,
ReplicationLevel replication);
-
- /**
* Send the given message to the raft service.
* The message may change the state of the service.
* For readonly messages, use {@link #sendReadOnly(Message)} instead.
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
new file mode 100644
index 0000000..de8320e
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.api;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/**
+ * APIs to support asynchronous operations such as send message, send
(stale)read message and watch request.
+ */
+public interface AsyncApi {
+ /**
+ * Async call to send the given message to the raft service.
+ * The message may change the state of the service.
+ * For readonly messages, use {@link #sendReadOnly(Message)} instead.
+ *
+ * @param message The request message.
+ * @return a future of the reply.
+ */
+ CompletableFuture<RaftClientReply> send(Message message);
+
+ /** Async call to send the given readonly message to the raft service. */
+ CompletableFuture<RaftClientReply> sendReadOnly(Message message);
+
+ /** Async call to send the given stale-read message to the given server (not
the raft service). */
+ CompletableFuture<RaftClientReply> sendStaleRead(Message message, long
minIndex, RaftPeerId server);
+
+ /** Async call to watch the given index to satisfy the given replication
level. */
+ CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel
replication);
+}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
new file mode 100644
index 0000000..09c3ae5
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.client.api.AsyncApi;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+
+/** Async api implementations. */
+class AsyncImpl implements AsyncApi {
+ private final RaftClientImpl client;
+
+ AsyncImpl(RaftClientImpl client) {
+ this.client = Objects.requireNonNull(client, "client == null");
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> send(Message message) {
+ return client.sendAsync(RaftClientRequest.writeRequestType(), message,
null);
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
+ return client.sendAsync(RaftClientRequest.readRequestType(), message,
null);
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> sendStaleRead(Message message,
long minIndex, RaftPeerId server) {
+ return client.sendAsync(RaftClientRequest.staleReadRequestType(minIndex),
message, server);
+ }
+
+ @Override
+ public CompletableFuture<RaftClientReply> watch(long index, ReplicationLevel
replication) {
+ return UnorderedAsync.send(RaftClientRequest.watchRequestType(index,
replication), client);
+ }
+}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f9f985f..25b4eab 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.client.impl;
+import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.RaftClient;
@@ -186,26 +187,6 @@ public final class RaftClientImpl implements RaftClient {
return streamApi.get();
}
- @Override
- public CompletableFuture<RaftClientReply> sendAsync(Message message) {
- return sendAsync(RaftClientRequest.writeRequestType(), message, null);
- }
-
- @Override
- public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message)
{
- return sendAsync(RaftClientRequest.readRequestType(), message, null);
- }
-
- @Override
- public CompletableFuture<RaftClientReply> sendStaleReadAsync(Message
message, long minIndex, RaftPeerId server) {
- return sendAsync(RaftClientRequest.staleReadRequestType(minIndex),
message, server);
- }
-
- @Override
- public CompletableFuture<RaftClientReply> sendWatchAsync(long index,
ReplicationLevel replication) {
- return UnorderedAsync.send(RaftClientRequest.watchRequestType(index,
replication), this);
- }
-
CompletableFuture<RaftClientReply> streamAsync(long streamId, long
messageId, Message message, boolean endOfRequest) {
return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId,
endOfRequest), message, null);
}
@@ -214,7 +195,7 @@ public final class RaftClientImpl implements RaftClient {
return sendAsync(RaftClientRequest.streamRequestType(streamId, messageId,
true), null, null);
}
- private CompletableFuture<RaftClientReply> sendAsync(
+ CompletableFuture<RaftClientReply> sendAsync(
RaftClientRequest.Type type, Message message, RaftPeerId server) {
return getOrderedAsync().send(type, message, server);
}
@@ -271,6 +252,11 @@ public final class RaftClientImpl implements RaftClient {
}
@Override
+ public AsyncApi async() {
+ return new AsyncImpl(this);
+ }
+
+ @Override
public GroupManagementApi getGroupManagementApi(RaftPeerId server) {
return new GroupManagementImpl(server, this);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
index 1cdbeac..2286d4c 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftOutputStream.java
@@ -32,7 +32,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
-/** An {@link OutputStream} implementation using {@link
RaftClient#sendAsync(Message)} API. */
+/** An {@link OutputStream} implementation using {@link
org.apache.ratis.client.api.AsyncApi#send(Message)} API. */
public class RaftOutputStream extends OutputStream {
private final Supplier<RaftClient> client;
private final AtomicBoolean closed = new AtomicBoolean();
@@ -87,7 +87,7 @@ public class RaftOutputStream extends OutputStream {
return;
}
- final CompletableFuture<Long> f = getClient().sendAsync(
+ final CompletableFuture<Long> f = getClient().async().send(
Message.valueOf(ProtoUtils.toByteString(buffer, 0, byteCount))
).thenApply(reply -> RaftClientImpl.handleRaftException(reply,
CompletionException::new)
).thenApply(reply -> reply != null && reply.isSuccess()? pos: null);
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 639cfdb..f6269bb 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -103,11 +103,11 @@ public class FileStoreClient implements Closeable {
}
private CompletableFuture<ByteString> sendAsync(ByteString request) {
- return sendAsync(request, client::sendAsync);
+ return sendAsync(request, client.async()::send);
}
private CompletableFuture<ByteString> sendReadOnlyAsync(ByteString request) {
- return sendAsync(request, client::sendReadOnlyAsync);
+ return sendAsync(request, client.async()::sendReadOnly);
}
public ByteString read(String path, long offset, long length) throws
IOException {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index 09ddec8..23b63fe 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -59,7 +59,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER extends
MiniRaftCluster>
private void runTestGroupMismatchException(CLUSTER cluster) throws Exception
{
// send a message to make sure the cluster is working
try(RaftClient client = cluster.createClient()) {
- final RaftClientReply reply = client.sendAsync(new
SimpleMessage("first")).get();
+ final RaftClientReply reply = client.async().send(new
SimpleMessage("first")).get();
Assert.assertTrue(reply.isSuccess());
}
@@ -74,7 +74,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER extends
MiniRaftCluster>
// send a few messages
final List<CompletableFuture<RaftClientReply>> futures = new
ArrayList<>();
for(SimpleMessage m : messages) {
- futures.add(client.sendAsync(m));
+ futures.add(client.async().send(m));
}
Assert.assertEquals(messages.length, futures.size());
@@ -108,7 +108,7 @@ public abstract class RaftAsyncExceptionTests<CLUSTER
extends MiniRaftCluster>
.map(cluster::getRaftServerImpl)
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockStartTransaction);
- final CompletableFuture<RaftClientReply> replyFuture =
client.sendAsync(new SimpleMessage("m1"));
+ final CompletableFuture<RaftClientReply> replyFuture =
client.async().send(new SimpleMessage("m1"));
FIVE_SECONDS.sleep();
// Unblock StartTransaction
cluster.getServers().stream()
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 552f049..d78fbdc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -134,7 +134,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
final SimpleMessage[] messages = SimpleMessage.create(10, "initial-");
final List<CompletableFuture<RaftClientReply>> replies = new
ArrayList<>();
for (int i = 0; i < messages.length; i++) {
- replies.add(client.sendAsync(messages[i]));
+ replies.add(client.async().send(messages[i]));
}
for (int i = 0; i < messages.length; i++) {
RaftTestUtil.assertSuccessReply(replies.get(i));
@@ -151,7 +151,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
int i = 0;
// send half of the calls without starting the cluster
for (; i < messages.length/2; i++) {
- replies.add(client.sendAsync(messages[i]));
+ replies.add(client.async().send(messages[i]));
}
// sleep most of the retry time
@@ -159,7 +159,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
// send another half of the calls without starting the cluster
for (; i < messages.length; i++) {
- replies.add(client.sendAsync(messages[i]));
+ replies.add(client.async().send(messages[i]));
}
Assert.assertEquals(messages.length, replies.size());
}
@@ -188,7 +188,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
}
}
- testFailureCaseAsync("last-request", () -> client.sendAsync(new
SimpleMessage("last")),
+ testFailureCaseAsync("last-request", () -> client.async().send(new
SimpleMessage("last")),
AlreadyClosedException.class, RaftRetryFailureException.class);
}
}
@@ -215,14 +215,14 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
AtomicInteger blockedRequestsCount = new AtomicInteger();
for (int i = 0; i < numMessages; i++) {
blockedRequestsCount.getAndIncrement();
- futures[i] = client.sendAsync(messages[i]);
+ futures[i] = client.async().send(messages[i]);
blockedRequestsCount.decrementAndGet();
}
Assert.assertEquals(0, blockedRequestsCount.get());
futures[numMessages] = CompletableFuture.supplyAsync(() -> {
blockedRequestsCount.incrementAndGet();
- client.sendAsync(new SimpleMessage("n1"));
+ client.async().send(new SimpleMessage("n1"));
blockedRequestsCount.decrementAndGet();
return null;
});
@@ -284,7 +284,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
for (int i = 0; i < numMesssages; i++) {
final String s = "" + i;
LOG.info("sendAsync " + s);
- futures.add(client.sendAsync(new SimpleMessage(s)));
+ futures.add(client.async().send(new SimpleMessage(s)));
}
Assert.assertEquals(numMesssages, futures.size());
final List<RaftClientReply> replies = new ArrayList<>();
@@ -310,7 +310,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
// test a failure case
testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
- () -> client.sendStaleReadAsync(
+ () -> client.async().sendStaleRead(
new SimpleMessage("" + Long.MAX_VALUE),
followerCommitInfo.getCommitIndex(), follower),
StateMachineException.class, IndexOutOfBoundsException.class);
@@ -321,12 +321,12 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
final String query = "" + i;
LOG.info("query=" + query + ", reply=" + reply);
final Message message = new SimpleMessage(query);
- final CompletableFuture<RaftClientReply> readFuture =
client.sendReadOnlyAsync(message);
+ final CompletableFuture<RaftClientReply> readFuture =
client.async().sendReadOnly(message);
futures.add(readFuture.thenCompose(r -> {
if (reply.getLogIndex() <= followerCommitIndex) {
LOG.info("sendStaleReadAsync, query=" + query);
- return client.sendStaleReadAsync(message, followerCommitIndex,
follower);
+ return client.async().sendStaleRead(message, followerCommitIndex,
follower);
} else {
return CompletableFuture.completedFuture(null);
}
@@ -389,7 +389,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
- CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new
SimpleMessage("abc"));
+ CompletableFuture<RaftClientReply> replyFuture = client.async().send(new
SimpleMessage("abc"));
Thread.sleep(waitTime);
// replyFuture should not be completed until append request is unblocked.
Assert.assertFalse(replyFuture.isDone());
@@ -428,7 +428,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
.forEach(peer -> logSyncDelay.setDelayMs(peer.getId().toString(),
1000));
// trigger append entries request
- client.sendAsync(new SimpleMessage("abc"));
+ client.async().send(new SimpleMessage("abc"));
// default max election timeout is 300ms, 1s is long enough to
// trigger failure of LeaderState::checkLeadership()
@@ -470,7 +470,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
// send a message to make sure that the leader is ready
try (final RaftClient client = cluster.createClient(leader.getId())) {
- final CompletableFuture<RaftClientReply> f = client.sendAsync(new
SimpleMessage("first"));
+ final CompletableFuture<RaftClientReply> f = client.async().send(new
SimpleMessage("first"));
FIVE_SECONDS.apply(f::get);
}
@@ -478,7 +478,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
final RetryPolicy r = event -> () -> TimeDuration.valueOf(60,
TimeUnit.SECONDS);
try (final RaftClient client =
cluster.createClient(followers.get(0).getId(), cluster.getGroup(), r)) {
- final CompletableFuture<RaftClientReply> f = client.sendAsync(new
SimpleMessage("abc"));
+ final CompletableFuture<RaftClientReply> f = client.async().send(new
SimpleMessage("abc"));
FIVE_SECONDS.apply(f::get);
} catch (TimeoutException e) {
throw new AssertionError("Failed to get async result", e);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index c17004e..7e1e47e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -138,7 +138,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
for (SimpleMessage message : messages) {
if (async) {
- client.sendAsync(message).thenAcceptAsync(reply -> {
+ client.async().send(message).thenAcceptAsync(reply -> {
if (!reply.isSuccess()) {
f.completeExceptionally(
new AssertionError("Failed with reply " + reply));
@@ -292,7 +292,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
Assert.assertTrue(reply.isSuccess());
} else {
final CompletableFuture<RaftClientReply> replyFuture =
- client.sendAsync(messages[i]);
+ client.async().send(messages[i]);
replyFuture.thenAcceptAsync(r -> {
if (!r.isSuccess()) {
f.completeExceptionally(
@@ -433,7 +433,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
// Ideally the client request should timeout and the client should retry.
// The retry is successful when the retry cache entry for the
corresponding callId and clientId expires.
if (async) {
- CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new
SimpleMessage("abc"));
+ CompletableFuture<RaftClientReply> replyFuture =
client.async().send(new SimpleMessage("abc"));
replyFuture.get();
} else {
client.send(new SimpleMessage("abc"));
@@ -464,7 +464,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
checkFollowerCommitLagsLeader(cluster);
if (async) {
- CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new
SimpleMessage("abc"));
+ CompletableFuture<RaftClientReply> replyFuture =
client.async().send(new SimpleMessage("abc"));
replyFuture.get();
} else {
client.send(new SimpleMessage("abc"));
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
index cd6a312..df7ba15 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
@@ -71,7 +71,7 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER
extends MiniRaftCluster>
try (RaftClient c1 = cluster.createClient(leader.getId())) {
{ // send first message to make sure the cluster is working
final SimpleMessage message = new SimpleMessage("first");
- final CompletableFuture<RaftClientReply> future =
c1.sendAsync(message);
+ final CompletableFuture<RaftClientReply> future =
c1.async().send(message);
final RaftClientReply reply = getWithDefaultTimeout(future);
Assert.assertTrue(reply.isSuccess());
}
@@ -84,13 +84,13 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER
extends MiniRaftCluster>
final List<CompletableFuture<RaftClientReply>> writeFutures = new
ArrayList<>();
for (int i = 0; i < writeElementLimit; i++) {
final SimpleMessage message = new SimpleMessage("m" + i);
- writeFutures.add(c1.sendAsync(message));
+ writeFutures.add(c1.async().send(message));
}
// send watch requests up to the limit
final long watchBase = 1000; //watch a large index so that it won't
complete
for (int i = 0; i < watchElementLimit; i++) {
- c1.sendWatchAsync(watchBase + i, ReplicationLevel.ALL);
+ c1.async().watch(watchBase + i, ReplicationLevel.ALL);
}
// sleep to make sure that all the request were sent
@@ -101,14 +101,14 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER
extends MiniRaftCluster>
final SimpleMessage message = new SimpleMessage("err");
testFailureCase("send should fail", () -> c2.send(message),
ResourceUnavailableException.class);
- testFailureCase("sendAsync should fail", () ->
c2.sendAsync(message).get(),
+ testFailureCase("sendAsync should fail", () ->
c2.async().send(message).get(),
ExecutionException.class, ResourceUnavailableException.class);
// more watch requests should get ResourceUnavailableException
final long watchIndex = watchBase + watchElementLimit;
testFailureCase("sendWatch should fail", () ->
c2.sendWatch(watchIndex, ReplicationLevel.ALL),
ResourceUnavailableException.class);
- testFailureCase("sendWatchAsync should fail", () ->
c2.sendWatchAsync(watchIndex, ReplicationLevel.ALL).get(),
+ testFailureCase("sendWatchAsync should fail", () ->
c2.async().watch(watchIndex, ReplicationLevel.ALL).get(),
ExecutionException.class, ResourceUnavailableException.class);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 368b871..47f11a8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -98,7 +98,8 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
for(int i = 0; i < numMessages; i++) {
final String message = "m" + i;
log.info("SEND_REQUEST {}: message={}", i, message);
- final CompletableFuture<RaftClientReply> replyFuture =
writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message));
+ final CompletableFuture<RaftClientReply> replyFuture =
+ writeClient.async().send(new RaftTestUtil.SimpleMessage(message));
replies.add(replyFuture);
final CompletableFuture<WatchReplies> watchFuture = new
CompletableFuture<>();
watches.add(watchFuture);
@@ -106,10 +107,10 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
final long logIndex = reply.getLogIndex();
log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex);
watchFuture.complete(new WatchReplies(logIndex,
- writeClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
- writeClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
- writeClient.sendWatchAsync(logIndex,
ReplicationLevel.MAJORITY_COMMITTED),
- writeClient.sendWatchAsync(logIndex,
ReplicationLevel.ALL_COMMITTED),
+ writeClient.async().watch(logIndex, ReplicationLevel.MAJORITY),
+ writeClient.async().watch(logIndex, ReplicationLevel.ALL),
+ writeClient.async().watch(logIndex,
ReplicationLevel.MAJORITY_COMMITTED),
+ writeClient.async().watch(logIndex,
ReplicationLevel.ALL_COMMITTED),
log));
});
}
@@ -122,11 +123,11 @@ public abstract class WatchRequestTests<CLUSTER extends
MiniRaftCluster>
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId(),
policy)) {
CompletableFuture<RaftClientReply> reply =
- watchClient.sendAsync(new RaftTestUtil.SimpleMessage("message"));
+ watchClient.async().send(new
RaftTestUtil.SimpleMessage("message"));
long writeIndex = reply.get().getLogIndex();
Assert.assertTrue(writeIndex > 0);
- watchClient.sendWatchAsync(writeIndex,
ReplicationLevel.MAJORITY_COMMITTED);
- return watchClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY);
+ watchClient.async().watch(writeIndex,
ReplicationLevel.MAJORITY_COMMITTED);
+ return watchClient.async().watch(logIndex, ReplicationLevel.MAJORITY);
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
index 1757874..be1e3ac 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcMessageMetrics.java
@@ -55,7 +55,8 @@ public class TestGrpcMessageMetrics extends BaseTest
static void sendMessages(MiniRaftCluster cluster) throws Exception {
waitForLeader(cluster);
try (final RaftClient client = cluster.createClient()) {
- CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new
RaftTestUtil.SimpleMessage("abc"));
+ CompletableFuture<RaftClientReply> replyFuture =
+ client.async().send(new RaftTestUtil.SimpleMessage("abc"));
}
// Wait for commits to happen on leader
JavaUtils.attempt(() -> assertMessageCount(cluster.getLeader()), 100,
HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertMessageCount", null);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 18dab32..6c37589 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -73,7 +73,7 @@ public class TestLogAppenderWithGrpc
}
Collection<CompletableFuture<RaftClientReply>> futures = new
ArrayList<>(maxAppends * 2);
for (int i = 0; i < maxAppends * 2; i++) {
- futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage("m")));
+ futures.add(client.async().send(new RaftTestUtil.SimpleMessage("m")));
}
FIVE_SECONDS.sleep();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 5d71f61..fda250c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -149,7 +149,7 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
try (final RaftClient client = cluster.createClient()) {
// send a request to make sure leader is ready
- final CompletableFuture<RaftClientReply> f = client.sendAsync(new
SimpleMessage("testing"));
+ final CompletableFuture<RaftClientReply> f = client.async().send(new
SimpleMessage("testing"));
Assert.assertTrue(f.get().isSuccess());
}
@@ -211,7 +211,7 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
try (RaftClient client = cluster.createClient()) {
// send a request to make sure leader is ready
- final CompletableFuture< RaftClientReply > f = client.sendAsync(new
SimpleMessage("testing"));
+ final CompletableFuture< RaftClientReply > f = client.async().send(new
SimpleMessage("testing"));
Assert.assertTrue(f.get().isSuccess());
}
@@ -226,7 +226,7 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
try {
RaftClient client = cluster.createClient(cluster.getLeader().getId(),
RetryPolicies.noRetry());
clients.add(client);
- client.sendAsync(new SimpleMessage(message));
+ client.async().send(new SimpleMessage(message));
final SortedMap<String, Gauge> gaugeMap =
@@ -239,7 +239,7 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
for (int i = 0; i < 10; i++) {
client = cluster.createClient(cluster.getLeader().getId(),
RetryPolicies.noRetry());
clients.add(client);
- client.sendAsync(new SimpleMessage(message));
+ client.async().send(new SimpleMessage(message));
}
// Because we have passed 11 requests, and the element queue size is 10.
@@ -252,7 +252,7 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
// and byte size counter limit will be hit.
client = cluster.createClient(cluster.getLeader().getId(),
RetryPolicies.noRetry());
- client.sendAsync(new SimpleMessage(RandomStringUtils.random(120, true,
false)));
+ client.async().send(new SimpleMessage(RandomStringUtils.random(120,
true, false)));
clients.add(client);
RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
@@ -274,24 +274,24 @@ public class TestRaftServerWithGrpc extends BaseTest
implements MiniRaftClusterW
RaftServerMetrics raftServerMetrics = leader.getRaftServerMetrics();
try (final RaftClient client = cluster.createClient()) {
- final CompletableFuture<RaftClientReply> f1 = client.sendAsync(new
SimpleMessage("testing"));
+ final CompletableFuture<RaftClientReply> f1 = client.async().send(new
SimpleMessage("testing"));
Assert.assertTrue(f1.get().isSuccess());
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_WRITE_REQUEST).getCount()
> 0);
- final CompletableFuture<RaftClientReply> f2 =
client.sendReadOnlyAsync(new SimpleMessage("testing"));
+ final CompletableFuture<RaftClientReply> f2 =
client.async().sendReadOnly(new SimpleMessage("testing"));
Assert.assertTrue(f2.get().isSuccess());
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_READ_REQUEST).getCount()
> 0);
- final CompletableFuture<RaftClientReply> f3 =
client.sendStaleReadAsync(new SimpleMessage("testing"),
+ final CompletableFuture<RaftClientReply> f3 =
client.async().sendStaleRead(new SimpleMessage("testing"),
0, leader.getId());
Assert.assertTrue(f3.get().isSuccess());
Assert.assertTrue(raftServerMetrics.getTimer(RAFT_CLIENT_STALE_READ_REQUEST).getCount()
> 0);
- final CompletableFuture<RaftClientReply> f4 = client.sendWatchAsync(0,
RaftProtos.ReplicationLevel.ALL);
+ final CompletableFuture<RaftClientReply> f4 = client.async().watch(0,
RaftProtos.ReplicationLevel.ALL);
Assert.assertTrue(f4.get().isSuccess());
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
"-ALL")).getCount() > 0);
- final CompletableFuture<RaftClientReply> f5 = client.sendWatchAsync(0,
RaftProtos.ReplicationLevel.MAJORITY);
+ final CompletableFuture<RaftClientReply> f5 = client.async().watch(0,
RaftProtos.ReplicationLevel.MAJORITY);
Assert.assertTrue(f5.get().isSuccess());
Assert.assertTrue(raftServerMetrics.getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST,
"")).getCount() > 0);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 0f1d71d..915221b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -80,7 +80,7 @@ public class TestRaftWithGrpc
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
CompletableFuture<RaftClientReply>
- replyFuture = client.sendAsync(new
RaftTestUtil.SimpleMessage("abc"));
+ replyFuture = client.async().send(new
RaftTestUtil.SimpleMessage("abc"));
TimeDuration.valueOf(5 , TimeUnit.SECONDS).sleep();
// replyFuture should not be completed until append request is unblocked.
Assert.assertTrue(!replyFuture.isDone());
diff --git
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 8fa02c9..66fbd71 100644
---
a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++
b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -185,12 +185,12 @@ public class TestExceptionDependentRetry implements
MiniRaftClusterWithGrpc.Fact
// create a client with the exception dependent policy
try (final RaftClient client = cluster.createClient(builder.build())) {
- client.sendAsync(new RaftTestUtil.SimpleMessage("1")).get();
+ client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
leader = cluster.getLeader();
((SimpleStateMachine4Testing)
leader.getStateMachine()).blockWriteStateMachineData();
- client.sendAsync(new RaftTestUtil.SimpleMessage("2")).get();
+ client.async().send(new RaftTestUtil.SimpleMessage("2")).get();
}
Assert.fail("Test should have failed.");
} catch (ExecutionException e) {