This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7015ba2f2 RATIS-1916. OrderAsync does not call handReply. (#948)
7015ba2f2 is described below

commit 7015ba2f274394697dffec417b43374656077d88
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 24 23:17:42 2023 -0700

    RATIS-1916. OrderAsync does not call handReply. (#948)
---
 .../apache/ratis/client/impl/ClientProtoUtils.java | 80 ++++++----------------
 .../org/apache/ratis/client/impl/OrderedAsync.java | 31 +++------
 .../apache/ratis/client/impl/UnorderedAsync.java   |  2 +-
 .../java/org/apache/ratis/util/ProtoUtils.java     |  3 +
 4 files changed, 37 insertions(+), 79 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index b5b1d8abe..3c0f14fb7 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -40,12 +40,10 @@ import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.RaftProtos.RouteProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.proto.RaftProtos.SnapshotCreateRequestProto;
 import org.apache.ratis.proto.RaftProtos.SnapshotManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.StateMachineExceptionProto;
 import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
-import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.GroupInfoReply;
@@ -110,55 +108,30 @@ public interface ClientProtoUtils {
         .setSuccess(success);
   }
 
-  static RaftRpcRequestProto.Builder 
toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId) 
{
-    return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(),
-        replyId.toByteString(), requestorId.getGroupId(), null, false, null, 
null, 0);
-  }
-
-  @SuppressWarnings("parameternumber")
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long 
callId, boolean toLeader,
-      SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long 
timeoutMs) {
-    if (slidingWindowEntry == null) {
-      slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
-    }
-
-    RaftRpcRequestProto.Builder b = RaftRpcRequestProto.newBuilder()
-        .setRequestorId(requesterId)
-        .setReplyId(replyId)
-        .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
-        .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
-        .setToLeader(toLeader)
-        .setSlidingWindowEntry(slidingWindowEntry)
-        .setTimeoutMs(timeoutMs);
-
-    if (routingTable != null) {
-      b.setRoutingTable(routingTable.toProto());
-    }
-
-    return b;
+      ByteString requestorId, RaftPeerId replyId, RaftGroupId groupId) {
+    return RaftRpcRequestProto.newBuilder()
+        .setRequestorId(requestorId)
+        .setReplyId(replyId.toByteString())
+        .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId));
   }
 
-  @SuppressWarnings("parameternumber")
-  static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long 
callId, boolean toLeader,
-      SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long 
timeoutMs) {
-    return toRaftRpcRequestProtoBuilder(
-        requesterId.toByteString(), replyId.toByteString(), groupId, callId, 
toLeader, slidingWindowEntry, routingTable,
-        timeoutMs);
+  /** For server requests. */
+  static RaftRpcRequestProto.Builder 
toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId) 
{
+    return 
toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(), replyId, 
requestorId.getGroupId());
   }
 
-  static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      RaftClientRequest request) {
-    return toRaftRpcRequestProtoBuilder(
-        request.getClientId(),
-        request.getServerId(),
-        request.getRaftGroupId(),
-        request.getCallId(),
-        request.isToLeader(),
-        request.getSlidingWindowEntry(),
-        request.getRoutingTable(),
-        request.getTimeoutMs());
+  /** For client requests. */
+  static RaftRpcRequestProto.Builder 
toRaftRpcRequestProtoBuilder(RaftClientRequest request) {
+    final RaftRpcRequestProto.Builder b = toRaftRpcRequestProtoBuilder(
+        request.getClientId().toByteString(), request.getServerId(), 
request.getRaftGroupId());
+
+    
Optional.ofNullable(request.getSlidingWindowEntry()).ifPresent(b::setSlidingWindowEntry);
+    
Optional.ofNullable(request.getRoutingTable()).map(RoutingTable::toProto).ifPresent(b::setRoutingTable);
+
+    return b.setCallId(request.getCallId())
+        .setToLeader(request.isToLeader())
+        .setTimeoutMs(request.getTimeoutMs());
   }
 
   static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto 
p) {
@@ -211,12 +184,14 @@ public interface ClientProtoUtils {
     } else {
       b.setServerId(perrId);
     }
+    if (request.hasSlidingWindowEntry()) {
+      b.setSlidingWindowEntry(request.getSlidingWindowEntry());
+    }
     return b.setClientId(ClientId.valueOf(request.getRequestorId()))
         .setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId()))
         .setCallId(request.getCallId())
         .setMessage(toMessage(p.getMessage()))
         .setType(type)
-        .setSlidingWindowEntry(request.getSlidingWindowEntry())
         .setRoutingTable(getRoutingTable(request))
         .setTimeoutMs(request.getTimeoutMs())
         .build();
@@ -264,17 +239,6 @@ public interface ClientProtoUtils {
     return b.build();
   }
 
-  static RaftClientRequestProto toRaftClientRequestProto(
-      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
-      long seqNum, ByteString content) {
-    return RaftClientRequestProto.newBuilder()
-        .setRpcRequest(toRaftRpcRequestProtoBuilder(
-            clientId, serverId, groupId, callId, false, 
ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
-        .setWrite(WriteRequestTypeProto.getDefaultInstance())
-        .setMessage(toClientMessageEntryProtoBuilder(content))
-        .build();
-  }
-
   static StateMachineExceptionProto.Builder 
toStateMachineExceptionProtoBuilder(StateMachineException e) {
     final Throwable t = e.getCause() != null? e.getCause(): e;
     return StateMachineExceptionProto.newBuilder()
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 5405d4906..a1aa58681 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -23,6 +23,7 @@ import 
org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -172,7 +173,11 @@ public final class OrderedAsync {
     ).thenApply(reply -> RaftClientImpl.handleRaftException(reply, 
CompletionException::new)
     ).whenComplete((r, e) -> {
       if (e != null) {
-        LOG.error("Failed to send request, message=" + message, e);
+        if (e.getCause() instanceof AlreadyClosedException) {
+          LOG.error("Failed to send request, message=" + message + " due to " 
+ e);
+        } else {
+          LOG.error("Failed to send request, message=" + message, e);
+        }
       }
       requestSemaphore.release();
     });
@@ -195,17 +200,7 @@ public final class OrderedAsync {
     }
 
     final RetryPolicy retryPolicy = client.getRetryPolicy();
-    sendRequest(pending).thenAccept(reply -> {
-      if (f.isDone()) {
-        return;
-      }
-      if (reply == null) {
-        scheduleWithTimeout(pending, request, retryPolicy, null);
-      } else {
-        client.handleReply(request, reply);
-        f.complete(reply);
-      }
-    }).exceptionally(e -> {
+    sendRequest(pending).exceptionally(e -> {
       if (e instanceof CompletionException) {
         e = JavaUtils.unwrapCompletionException(e);
         scheduleWithTimeout(pending, request, retryPolicy, e);
@@ -235,25 +230,21 @@ public final class OrderedAsync {
 
   private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest 
pending) {
     final RetryPolicy retryPolicy = client.getRetryPolicy();
-    final CompletableFuture<RaftClientReply> f;
     final RaftClientRequest request;
     if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
       pending.setFirstRequest();
     }
     request = pending.newRequest();
     LOG.debug("{}: send* {}", client.getId(), request);
-    f = client.getClientRpc().sendRequestAsync(request);
-    return f.thenApply(reply -> {
+    return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
       LOG.debug("{}: receive* {}", client.getId(), reply);
+      Objects.requireNonNull(reply, "reply == null");
+      client.handleReply(request, reply);
       getSlidingWindow(request).receiveReply(
           request.getSlidingWindowEntry().getSeqNum(), reply, 
this::sendRequestWithRetry);
       return reply;
     }).exceptionally(e -> {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(client.getId() + ": Failed* " + request, e);
-      } else {
-        LOG.debug("{}: Failed* {} with {}", client.getId(), request, e);
-      }
+      LOG.error(client.getId() + ": Failed* " + request, e);
       e = JavaUtils.unwrapCompletionException(e);
       if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
         pending.incrementExceptionCount(e);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index b053df172..84b817b58 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -103,7 +103,7 @@ public interface UnorderedAsync {
           if (LOG.isTraceEnabled()) {
             LOG.trace(clientId + ": attempt #" + attemptCount + " failed~ " + 
request, e);
           } else {
-            LOG.debug("{}: attempt #{} failed {} with {}", clientId, 
attemptCount, request, e);
+            LOG.debug("{}: attempt #{} failed {} with {}", clientId, 
attemptCount, request, e.toString());
           }
           e = JavaUtils.unwrapCompletionException(e);
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index e57ae552b..cd22ebe6b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -234,6 +234,9 @@ public interface ProtoUtils {
   }
 
   static String toString(SlidingWindowEntry proto) {
+    if (proto == null) {
+      return null;
+    }
     return proto.getSeqNum() + (proto.getIsFirst()? "*": "");
   }
 

Reply via email to