This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7825b4f03 RATIS-1071. NettyClientRpc supports sendRequestAsync. 
Contributed by Rui Wang and Tsz-Wo Nicholas Sze. (#1122)
7825b4f03 is described below

commit 7825b4f032653efc0cf75ff7e650f8055707f9d1
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Jul 17 13:30:07 2024 +0800

    RATIS-1071. NettyClientRpc supports sendRequestAsync. Contributed by Rui 
Wang and Tsz-Wo Nicholas Sze. (#1122)
---
 .../java/org/apache/ratis/netty/NettyRpcProxy.java |  6 ++
 .../apache/ratis/netty/client/NettyClientRpc.java  | 67 ++++++++++++++++------
 .../apache/ratis/netty/TestRaftAsyncWithNetty.java | 25 ++++++++
 3 files changed, 81 insertions(+), 17 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index b9788a8bb..41269f76e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -176,6 +176,12 @@ public class NettyRpcProxy implements Closeable {
     connection.close();
   }
 
+  public CompletableFuture<RaftNettyServerReplyProto> 
sendAsync(RaftNettyServerRequestProto proto) {
+    final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
+    connection.offer(proto, reply);
+    return reply;
+  }
+
   public RaftNettyServerReplyProto send(
       RaftRpcRequestProto request, RaftNettyServerRequestProto proto)
       throws IOException {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index c816e29ee..26ac41f7d 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -28,71 +28,104 @@ import 
org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
   public NettyClientRpc(ClientId clientId, RaftProperties properties) {
     super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
   }
 
+  @Override
+  public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest 
request) {
+    final RaftPeerId serverId = request.getServerId();
+    try {
+      final NettyRpcProxy proxy = getProxies().getProxy(serverId);
+      final RaftNettyServerRequestProto serverRequestProto = 
buildRequestProto(request);
+      return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+        if (request instanceof GroupListRequest) {
+          return 
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
+        } else if (request instanceof GroupInfoRequest) {
+          return 
ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
+        } else {
+          return 
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
+        }
+      });
+    } catch (Throwable e) {
+      return JavaUtils.completeExceptionally(e);
+    }
+  }
+
   @Override
   public RaftClientReply sendRequest(RaftClientRequest request) throws 
IOException {
     final RaftPeerId serverId = request.getServerId();
     final NettyRpcProxy proxy = getProxies().getProxy(serverId);
 
+    final RaftNettyServerRequestProto serverRequestProto = 
buildRequestProto(request);
+    final RaftRpcRequestProto rpcRequest = 
getRpcRequestProto(serverRequestProto);
+    if (request instanceof GroupListRequest) {
+      return ClientProtoUtils.toGroupListReply(
+          proxy.send(rpcRequest, serverRequestProto).getGroupListReply());
+    } else if (request instanceof GroupInfoRequest) {
+      return ClientProtoUtils.toGroupInfoReply(
+          proxy.send(rpcRequest, serverRequestProto).getGroupInfoReply());
+    } else {
+      return ClientProtoUtils.toRaftClientReply(
+          proxy.send(rpcRequest, serverRequestProto).getRaftClientReply());
+    }
+  }
+
+  private RaftNettyServerRequestProto buildRequestProto(RaftClientRequest 
request) {
     final RaftNettyServerRequestProto.Builder b = 
RaftNettyServerRequestProto.newBuilder();
-    final RaftRpcRequestProto rpcRequest;
     if (request instanceof GroupManagementRequest) {
       final GroupManagementRequestProto proto = 
ClientProtoUtils.toGroupManagementRequestProto(
           (GroupManagementRequest)request);
       b.setGroupManagementRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     } else if (request instanceof SetConfigurationRequest) {
       final SetConfigurationRequestProto proto = 
ClientProtoUtils.toSetConfigurationRequestProto(
           (SetConfigurationRequest)request);
       b.setSetConfigurationRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     } else if (request instanceof GroupListRequest) {
       final RaftProtos.GroupListRequestProto proto = 
ClientProtoUtils.toGroupListRequestProto(
           (GroupListRequest)request);
       b.setGroupListRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     } else if (request instanceof GroupInfoRequest) {
       final RaftProtos.GroupInfoRequestProto proto = 
ClientProtoUtils.toGroupInfoRequestProto(
           (GroupInfoRequest)request);
       b.setGroupInfoRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     } else if (request instanceof TransferLeadershipRequest) {
       final RaftProtos.TransferLeadershipRequestProto proto = 
ClientProtoUtils.toTransferLeadershipRequestProto(
           (TransferLeadershipRequest)request);
       b.setTransferLeadershipRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     } else if (request instanceof SnapshotManagementRequest) {
       final RaftProtos.SnapshotManagementRequestProto proto = 
ClientProtoUtils.toSnapshotManagementRequestProto(
           (SnapshotManagementRequest) request);
       b.setSnapshotManagementRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     } else if (request instanceof LeaderElectionManagementRequest) {
       final RaftProtos.LeaderElectionManagementRequestProto proto =
           ClientProtoUtils.toLeaderElectionManagementRequestProto(
           (LeaderElectionManagementRequest) request);
       b.setLeaderElectionManagementRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     } else {
       final RaftClientRequestProto proto = 
ClientProtoUtils.toRaftClientRequestProto(request);
       b.setRaftClientRequest(proto);
-      rpcRequest = proto.getRpcRequest();
     }
-    if (request instanceof GroupListRequest) {
-      return ClientProtoUtils.toGroupListReply(
-          proxy.send(rpcRequest, b.build()).getGroupListReply());
-    } else if (request instanceof GroupInfoRequest) {
-      return ClientProtoUtils.toGroupInfoReply(
-          proxy.send(rpcRequest, b.build()).getGroupInfoReply());
+    return b.build();
+  }
+
+  private RaftRpcRequestProto getRpcRequestProto(RaftNettyServerRequestProto 
serverRequestProto) {
+    if (serverRequestProto.hasGroupManagementRequest()) {
+      return serverRequestProto.getGroupManagementRequest().getRpcRequest();
+    } else if (serverRequestProto.hasSetConfigurationRequest()) {
+      return serverRequestProto.getSetConfigurationRequest().getRpcRequest();
+    } else if (serverRequestProto.hasGroupListRequest()) {
+      return serverRequestProto.getGroupListRequest().getRpcRequest();
+    } else if (serverRequestProto.hasGroupInfoRequest()) {
+      return serverRequestProto.getGroupInfoRequest().getRpcRequest();
     } else {
-      return ClientProtoUtils.toRaftClientReply(
-          proxy.send(rpcRequest, b.build()).getRaftClientReply());
+      return serverRequestProto.getRaftClientRequest().getRpcRequest();
     }
   }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
new file mode 100644
index 000000000..ebaa33d50
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.RaftAsyncTests;
+
+public class TestRaftAsyncWithNetty
+    extends RaftAsyncTests<MiniRaftClusterWithNetty>
+    implements MiniRaftClusterWithNetty.FactoryGet {
+}
\ No newline at end of file

Reply via email to