This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-3.1.1_review in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 53b80c6dc767895805a2376f3a651506a92d0b10 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Wed Jul 17 13:30:07 2024 +0800 RATIS-1071. NettyClientRpc supports sendRequestAsync. Contributed by Rui Wang and Tsz-Wo Nicholas Sze. (#1122) --- .../java/org/apache/ratis/netty/NettyRpcProxy.java | 6 ++ .../apache/ratis/netty/client/NettyClientRpc.java | 67 ++++++++++++++++------ .../apache/ratis/netty/TestRaftAsyncWithNetty.java | 25 ++++++++ 3 files changed, 81 insertions(+), 17 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java index b9788a8bb..41269f76e 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java @@ -176,6 +176,12 @@ public class NettyRpcProxy implements Closeable { connection.close(); } + public CompletableFuture<RaftNettyServerReplyProto> sendAsync(RaftNettyServerRequestProto proto) { + final CompletableFuture<RaftNettyServerReplyProto> reply = new CompletableFuture<>(); + connection.offer(proto, reply); + return reply; + } + public RaftNettyServerReplyProto send( RaftRpcRequestProto request, RaftNettyServerRequestProto proto) throws IOException { diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java index c816e29ee..26ac41f7d 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -28,71 +28,104 @@ import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto; +import org.apache.ratis.util.JavaUtils; import java.io.IOException; +import java.util.concurrent.CompletableFuture; public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> { public NettyClientRpc(ClientId clientId, RaftProperties properties) { super(new NettyRpcProxy.PeerMap(clientId.toString(), properties)); } + @Override + public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) { + final RaftPeerId serverId = request.getServerId(); + try { + final NettyRpcProxy proxy = getProxies().getProxy(serverId); + final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request); + return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> { + if (request instanceof GroupListRequest) { + return ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply()); + } else if (request instanceof GroupInfoRequest) { + return ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply()); + } else { + return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply()); + } + }); + } catch (Throwable e) { + return JavaUtils.completeExceptionally(e); + } + } + @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); final NettyRpcProxy proxy = getProxies().getProxy(serverId); + final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request); + final RaftRpcRequestProto rpcRequest = getRpcRequestProto(serverRequestProto); + if (request instanceof GroupListRequest) { + return ClientProtoUtils.toGroupListReply( + proxy.send(rpcRequest, serverRequestProto).getGroupListReply()); + } else if (request instanceof GroupInfoRequest) { + return ClientProtoUtils.toGroupInfoReply( + proxy.send(rpcRequest, serverRequestProto).getGroupInfoReply()); + } else { + return ClientProtoUtils.toRaftClientReply( + proxy.send(rpcRequest, serverRequestProto).getRaftClientReply()); + } + } + + private RaftNettyServerRequestProto buildRequestProto(RaftClientRequest request) { final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); - final RaftRpcRequestProto rpcRequest; if (request instanceof GroupManagementRequest) { final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto( (GroupManagementRequest)request); b.setGroupManagementRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof SetConfigurationRequest) { final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto( (SetConfigurationRequest)request); b.setSetConfigurationRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof GroupListRequest) { final RaftProtos.GroupListRequestProto proto = ClientProtoUtils.toGroupListRequestProto( (GroupListRequest)request); b.setGroupListRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof GroupInfoRequest) { final RaftProtos.GroupInfoRequestProto proto = ClientProtoUtils.toGroupInfoRequestProto( (GroupInfoRequest)request); b.setGroupInfoRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof TransferLeadershipRequest) { final RaftProtos.TransferLeadershipRequestProto proto = ClientProtoUtils.toTransferLeadershipRequestProto( (TransferLeadershipRequest)request); b.setTransferLeadershipRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof SnapshotManagementRequest) { final RaftProtos.SnapshotManagementRequestProto proto = ClientProtoUtils.toSnapshotManagementRequestProto( (SnapshotManagementRequest) request); b.setSnapshotManagementRequest(proto); - rpcRequest = proto.getRpcRequest(); } else if (request instanceof LeaderElectionManagementRequest) { final RaftProtos.LeaderElectionManagementRequestProto proto = ClientProtoUtils.toLeaderElectionManagementRequestProto( (LeaderElectionManagementRequest) request); b.setLeaderElectionManagementRequest(proto); - rpcRequest = proto.getRpcRequest(); } else { final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request); b.setRaftClientRequest(proto); - rpcRequest = proto.getRpcRequest(); } - if (request instanceof GroupListRequest) { - return ClientProtoUtils.toGroupListReply( - proxy.send(rpcRequest, b.build()).getGroupListReply()); - } else if (request instanceof GroupInfoRequest) { - return ClientProtoUtils.toGroupInfoReply( - proxy.send(rpcRequest, b.build()).getGroupInfoReply()); + return b.build(); + } + + private RaftRpcRequestProto getRpcRequestProto(RaftNettyServerRequestProto serverRequestProto) { + if (serverRequestProto.hasGroupManagementRequest()) { + return serverRequestProto.getGroupManagementRequest().getRpcRequest(); + } else if (serverRequestProto.hasSetConfigurationRequest()) { + return serverRequestProto.getSetConfigurationRequest().getRpcRequest(); + } else if (serverRequestProto.hasGroupListRequest()) { + return serverRequestProto.getGroupListRequest().getRpcRequest(); + } else if (serverRequestProto.hasGroupInfoRequest()) { + return serverRequestProto.getGroupInfoRequest().getRpcRequest(); } else { - return ClientProtoUtils.toRaftClientReply( - proxy.send(rpcRequest, b.build()).getRaftClientReply()); + return serverRequestProto.getRaftClientRequest().getRpcRequest(); } } } diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java new file mode 100644 index 000000000..ebaa33d50 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.RaftAsyncTests; + +public class TestRaftAsyncWithNetty + extends RaftAsyncTests<MiniRaftClusterWithNetty> + implements MiniRaftClusterWithNetty.FactoryGet { +} \ No newline at end of file
