This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new bd4ab145d RATIS-2102. AsyncApi#send() is not handling retry and reply
correctly for replication levels higher than MAJORITY (#1104)
bd4ab145d is described below
commit bd4ab145d6b1a72d8ed84eab918cda4e61f08681
Author: Siyao Meng <[email protected]>
AuthorDate: Wed May 29 11:13:47 2024 -0700
RATIS-2102. AsyncApi#send() is not handling retry and reply correctly for
replication levels higher than MAJORITY (#1104)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 46 ++++++++++++++++------
.../test/java/org/apache/ratis/RaftAsyncTests.java | 33 ++++++++++++++++
2 files changed, 68 insertions(+), 11 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4512a2c22..cba3bba1c 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -832,20 +832,14 @@ class RaftServerImpl implements RaftServer.Division,
leaderState.notifySenders();
}
- final CompletableFuture<RaftClientReply> future = pending.getFuture();
- if (request.is(TypeCase.WRITE)) {
- // check replication
- final ReplicationLevel replication =
request.getType().getWrite().getReplication();
- if (replication != ReplicationLevel.MAJORITY) {
- return future.thenCompose(reply -> waitForReplication(reply,
replication));
- }
- }
-
- return future;
+ return pending.getFuture();
}
/** Wait until the given replication requirement is satisfied. */
private CompletableFuture<RaftClientReply>
waitForReplication(RaftClientReply reply, ReplicationLevel replication) {
+ if (!reply.isSuccess()) {
+ return CompletableFuture.completedFuture(reply);
+ }
final RaftClientRequest.Type type =
RaftClientRequest.watchRequestType(reply.getLogIndex(), replication);
final RaftClientRequest watch = RaftClientRequest.newBuilder()
.setServerId(reply.getServerId())
@@ -854,7 +848,24 @@ class RaftServerImpl implements RaftServer.Division,
.setCallId(reply.getCallId())
.setType(type)
.build();
- return watchAsync(watch).thenApply(r -> reply);
+ return watchAsync(watch).thenApply(watchReply -> combineReplies(reply,
watchReply));
+ }
+
+ private RaftClientReply combineReplies(RaftClientReply reply,
RaftClientReply watchReply) {
+ final RaftClientReply combinedReply = RaftClientReply.newBuilder()
+ .setServerId(getMemberId())
+ // from write reply
+ .setClientId(reply.getClientId())
+ .setCallId(reply.getCallId())
+ .setMessage(reply.getMessage())
+ .setLogIndex(reply.getLogIndex())
+ // from watchReply
+ .setSuccess(watchReply.isSuccess())
+ .setException(watchReply.getException())
+ .setCommitInfos(watchReply.getCommitInfos())
+ .build();
+ LOG.debug("combinedReply={}", combinedReply);
+ return combinedReply;
}
void stepDownOnJvmPause() {
@@ -934,6 +945,19 @@ class RaftServerImpl implements RaftServer.Division,
}
private CompletableFuture<RaftClientReply>
writeAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
+ final RaftClientRequest request = requestRef.get();
+ final CompletableFuture<RaftClientReply> future =
writeAsyncImpl(requestRef);
+ if (request.is(TypeCase.WRITE)) {
+ // check replication
+ final ReplicationLevel replication =
request.getType().getWrite().getReplication();
+ if (replication != ReplicationLevel.MAJORITY) {
+ return future.thenCompose(r -> waitForReplication(r, replication));
+ }
+ }
+ return future;
+ }
+
+ private CompletableFuture<RaftClientReply>
writeAsyncImpl(ReferenceCountedObject<RaftClientRequest> requestRef) {
final RaftClientRequest request = requestRef.get();
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 1ac704f59..925b8bbad 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
@@ -357,6 +358,38 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
}
}
+ @Test
+ public void testWriteAsyncCustomReplicationLevel() throws Exception {
+ // verify that send(msg, ALL_COMMITTED) would reply with all servers
committed past the log index
+ runWithNewCluster(NUM_SERVERS,
this::runTestWriteAsyncCustomReplicationLevel);
+ }
+
+ void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws
Exception {
+ final int numMessages = 20;
+ try (RaftClient client = cluster.createClient()) {
+ RaftTestUtil.waitForLeader(cluster);
+
+ // submit some messages
+ for (int i = 0; i < numMessages; i++) {
+ final String s = "" + i;
+ LOG.info("sendAsync with ALL_COMMITTED " + s);
+ client.async().send(new SimpleMessage(s),
ReplicationLevel.ALL_COMMITTED).whenComplete((reply, exception) -> {
+ if (exception != null) {
+ LOG.error("Failed to send message " + s, exception);
+ // reply should be null in case of exception
+ Assert.assertNull(reply);
+ return;
+ }
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertNull(reply.getException());
+ // verify that all servers have caught up to log index when the
reply is returned
+ reply.getCommitInfos().forEach(commitInfoProto ->
+ Assert.assertTrue(commitInfoProto.getCommitIndex() >=
reply.getLogIndex()));
+ });
+ }
+ }
+ }
+
@Test
public void testRequestTimeout() throws Exception {
final TimeDuration oldExpiryTime =
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());