This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new bd4ab145d RATIS-2102. AsyncApi#send() is not handling retry and reply 
correctly for replication levels higher than MAJORITY (#1104)
bd4ab145d is described below

commit bd4ab145d6b1a72d8ed84eab918cda4e61f08681
Author: Siyao Meng <[email protected]>
AuthorDate: Wed May 29 11:13:47 2024 -0700

    RATIS-2102. AsyncApi#send() is not handling retry and reply correctly for 
replication levels higher than MAJORITY (#1104)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 46 ++++++++++++++++------
 .../test/java/org/apache/ratis/RaftAsyncTests.java | 33 ++++++++++++++++
 2 files changed, 68 insertions(+), 11 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4512a2c22..cba3bba1c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -832,20 +832,14 @@ class RaftServerImpl implements RaftServer.Division,
       leaderState.notifySenders();
     }
 
-    final CompletableFuture<RaftClientReply> future = pending.getFuture();
-    if (request.is(TypeCase.WRITE)) {
-      // check replication
-      final ReplicationLevel replication = 
request.getType().getWrite().getReplication();
-      if (replication != ReplicationLevel.MAJORITY) {
-        return future.thenCompose(reply -> waitForReplication(reply, 
replication));
-      }
-    }
-
-    return future;
+    return pending.getFuture();
   }
 
   /** Wait until the given replication requirement is satisfied. */
   private CompletableFuture<RaftClientReply> 
waitForReplication(RaftClientReply reply, ReplicationLevel replication) {
+    if (!reply.isSuccess()) {
+      return CompletableFuture.completedFuture(reply);
+    }
     final RaftClientRequest.Type type = 
RaftClientRequest.watchRequestType(reply.getLogIndex(), replication);
     final RaftClientRequest watch = RaftClientRequest.newBuilder()
         .setServerId(reply.getServerId())
@@ -854,7 +848,24 @@ class RaftServerImpl implements RaftServer.Division,
         .setCallId(reply.getCallId())
         .setType(type)
         .build();
-    return watchAsync(watch).thenApply(r -> reply);
+    return watchAsync(watch).thenApply(watchReply -> combineReplies(reply, 
watchReply));
+  }
+
+  private RaftClientReply combineReplies(RaftClientReply reply, 
RaftClientReply watchReply) {
+    final RaftClientReply combinedReply = RaftClientReply.newBuilder()
+        .setServerId(getMemberId())
+        // from write reply
+        .setClientId(reply.getClientId())
+        .setCallId(reply.getCallId())
+        .setMessage(reply.getMessage())
+        .setLogIndex(reply.getLogIndex())
+        // from watchReply
+        .setSuccess(watchReply.isSuccess())
+        .setException(watchReply.getException())
+        .setCommitInfos(watchReply.getCommitInfos())
+        .build();
+    LOG.debug("combinedReply={}", combinedReply);
+    return combinedReply;
   }
 
   void stepDownOnJvmPause() {
@@ -934,6 +945,19 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   private CompletableFuture<RaftClientReply> 
writeAsync(ReferenceCountedObject<RaftClientRequest> requestRef) {
+    final RaftClientRequest request = requestRef.get();
+    final CompletableFuture<RaftClientReply> future = 
writeAsyncImpl(requestRef);
+    if (request.is(TypeCase.WRITE)) {
+      // check replication
+      final ReplicationLevel replication = 
request.getType().getWrite().getReplication();
+      if (replication != ReplicationLevel.MAJORITY) {
+        return future.thenCompose(r -> waitForReplication(r, replication));
+      }
+    }
+    return future;
+  }
+
+  private CompletableFuture<RaftClientReply> 
writeAsyncImpl(ReferenceCountedObject<RaftClientRequest> requestRef) {
     final RaftClientRequest request = requestRef.get();
     final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
     if (reply != null) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 1ac704f59..925b8bbad 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
@@ -357,6 +358,38 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
     }
   }
 
+  @Test
+  public void testWriteAsyncCustomReplicationLevel() throws Exception {
+    // verify that send(msg, ALL_COMMITTED) would reply with all servers 
committed past the log index
+    runWithNewCluster(NUM_SERVERS, 
this::runTestWriteAsyncCustomReplicationLevel);
+  }
+
+  void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws 
Exception {
+    final int numMessages = 20;
+    try (RaftClient client = cluster.createClient()) {
+      RaftTestUtil.waitForLeader(cluster);
+
+      // submit some messages
+      for (int i = 0; i < numMessages; i++) {
+        final String s = "" + i;
+        LOG.info("sendAsync with ALL_COMMITTED " + s);
+        client.async().send(new SimpleMessage(s), 
ReplicationLevel.ALL_COMMITTED).whenComplete((reply, exception) -> {
+          if (exception != null) {
+            LOG.error("Failed to send message " + s, exception);
+            // reply should be null in case of exception
+            Assert.assertNull(reply);
+            return;
+          }
+          Assert.assertTrue(reply.isSuccess());
+          Assert.assertNull(reply.getException());
+          // verify that all servers have caught up to log index when the 
reply is returned
+          reply.getCommitInfos().forEach(commitInfoProto ->
+              Assert.assertTrue(commitInfoProto.getCommitIndex() >= 
reply.getLogIndex()));
+        });
+      }
+    }
+  }
+
   @Test
   public void testRequestTimeout() throws Exception {
     final TimeDuration oldExpiryTime = 
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());

Reply via email to