Repository: incubator-ratis Updated Branches: refs/heads/master 8ef7b4852 -> 39916f231
RATIS-338. ServerInformationReply should include multiple groups. Contributed by Sergey Soldatov Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/39916f23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/39916f23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/39916f23 Branch: refs/heads/master Commit: 39916f231499351c35dce64a1b5434ade1a5cf5f Parents: 8ef7b48 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Oct 22 23:15:58 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Oct 22 23:15:58 2018 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 7 +- .../ratis/client/impl/ClientProtoUtils.java | 77 ++++++++++++++++---- .../ratis/client/impl/RaftClientImpl.java | 16 ++-- .../protocol/AdminAsynchronousProtocol.java | 7 +- .../apache/ratis/protocol/AdminProtocol.java | 4 +- .../apache/ratis/protocol/GroupInfoReply.java | 64 ++++++++++++++++ .../apache/ratis/protocol/GroupInfoRequest.java | 29 ++++++++ .../apache/ratis/protocol/GroupListReply.java | 43 +++++++++++ .../apache/ratis/protocol/GroupListRequest.java | 29 ++++++++ .../ratis/protocol/ServerInformationReply.java | 63 ---------------- .../protocol/ServerInformationRequest.java | 29 -------- .../grpc/client/GrpcClientProtocolClient.java | 11 ++- .../apache/ratis/grpc/client/GrpcClientRpc.java | 23 +++--- .../grpc/server/GrpcAdminProtocolService.java | 23 ++++-- ...nedClientProtocolClientSideTranslatorPB.java | 23 +++--- ...nedClientProtocolServerSideTranslatorPB.java | 32 ++++++-- .../ratis/hadooprpc/client/HadoopClientRpc.java | 6 +- .../ratis/netty/client/NettyClientRpc.java | 22 ++++-- .../ratis/netty/server/NettyRpcService.java | 22 ++++-- ratis-proto/src/main/proto/Grpc.proto | 7 +- ratis-proto/src/main/proto/Hadoop.proto | 7 +- ratis-proto/src/main/proto/Netty.proto | 8 +- ratis-proto/src/main/proto/Raft.proto | 16 +++- .../org/apache/ratis/server/RaftServer.java | 3 + .../ratis/server/impl/RaftServerImpl.java | 22 ++++-- .../ratis/server/impl/RaftServerProxy.java | 27 +++++-- .../server/impl/ServerInformationBaseTest.java | 53 ++++++++++---- .../server/simulation/SimulatedServerRpc.java | 7 +- 28 files changed, 469 insertions(+), 211 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 0b8fdb0..3929164 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -103,8 +103,11 @@ public interface RaftClient extends Closeable { /** Send groupRemove request to the given server (not the raft service). */ RaftClientReply groupRemove(RaftGroupId groupId, boolean deleteDirectory, RaftPeerId server) throws IOException; - /** Send serverInformation request to the given server.*/ - RaftClientReply serverInformation(RaftPeerId server) throws IOException; + /** Send getGroups request to the given server.*/ + RaftClientReply getGroups(RaftPeerId server) throws IOException; + + /** Send getGroupInfo request to the given server.*/ + RaftClientReply getGroupInfo(RaftGroupId group, RaftPeerId server) throws IOException; /** @return a {@link Builder}. */ static Builder newBuilder() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index a9e509a..5b03145 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -24,6 +24,7 @@ import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.ReflectionUtils; import java.util.Arrays; +import java.util.stream.Collectors; import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION; import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION; @@ -184,20 +185,34 @@ public interface ClientProtoUtils { return b.build(); } - static ServerInformationReplyProto toServerInformationReplyProto( - ServerInformationReply reply) { - final ServerInformationReplyProto.Builder b = - ServerInformationReplyProto.newBuilder(); + static GroupListReplyProto toGroupListReplyProto( + GroupListReply reply) { + final GroupListReplyProto.Builder b = + GroupListReplyProto.newBuilder(); + if (reply != null) { + b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toByteString(), + reply.getServerId().toByteString(), reply.getRaftGroupId(), + reply.getCallId(), reply.isSuccess())); + if (reply.getGroupIds() != null) { + reply.getGroupIds().forEach(groupId -> b.addGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))); + } + } + return b.build(); + } + + static GroupInfoReplyProto toGroupInfoReplyProto(GroupInfoReply reply) { + final GroupInfoReplyProto.Builder b = + GroupInfoReplyProto.newBuilder(); if (reply != null) { b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toByteString(), reply.getServerId().toByteString(), reply.getRaftGroupId(), reply.getCallId(), reply.isSuccess())); if (reply.getRaftGroupId() != null) { b.setGroup(ProtoUtils.toRaftGroupProtoBuilder(reply.getGroup())); + b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy()); + b.setRole(reply.getRoleInfoProto()); + ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> b.addCommitInfos(i)); } - b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy()); - b.setRole(reply.getRoleInfoProto()); - ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> b.addCommitInfos(i)); } return b.build(); } @@ -233,15 +248,26 @@ public interface ClientProtoUtils { replyProto.getLogIndex(), replyProto.getCommitInfosList()); } - static ServerInformationReply toServerInformationReply( - ServerInformationReplyProto replyProto) { + static GroupListReply toGroupListReply( + GroupListReplyProto replyProto) { + final RaftRpcReplyProto rp = replyProto.getRpcReply(); + ClientId clientId = ClientId.valueOf(rp.getRequestorId()); + final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId()); + final Iterable<RaftGroupId> groupInfos = replyProto.getGroupIdList().stream() + .map(id -> ProtoUtils.toRaftGroupId(id)).collect(Collectors.toList()); + return new GroupListReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), + groupId, rp.getCallId(), rp.getSuccess(), groupInfos); + } + + static GroupInfoReply toGroupInfoReply( + GroupInfoReplyProto replyProto) { final RaftRpcReplyProto rp = replyProto.getRpcReply(); ClientId clientId = ClientId.valueOf(rp.getRequestorId()); final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId()); final RaftGroup raftGroup = ProtoUtils.toRaftGroup(replyProto.getGroup()); RoleInfoProto role = replyProto.getRole(); boolean isRaftStorageHealthy = replyProto.getIsRaftStorageHealthy(); - return new ServerInformationReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), + return new GroupInfoReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), groupId, rp.getCallId(), rp.getSuccess(), role, isRaftStorageHealthy, replyProto.getCommitInfosList(), raftGroup); } @@ -317,16 +343,27 @@ public interface ClientProtoUtils { } } - static ServerInformationRequest toServerInformationRequest( - ServerInformationRequestProto p) { + static GroupInfoRequest toGroupInfoRequest( + GroupInfoRequestProto p) { + final RaftRpcRequestProto m = p.getRpcRequest(); + return new GroupInfoRequest( + ClientId.valueOf(m.getRequestorId()), + RaftPeerId.valueOf(m.getReplyId()), + ProtoUtils.toRaftGroupId(m.getRaftGroupId()), + m.getCallId()); + } + + static GroupListRequest toGroupListRequest( + GroupListRequestProto p) { final RaftRpcRequestProto m = p.getRpcRequest(); - return new ServerInformationRequest( + return new GroupListRequest( ClientId.valueOf(m.getRequestorId()), RaftPeerId.valueOf(m.getReplyId()), ProtoUtils.toRaftGroupId(m.getRaftGroupId()), m.getCallId()); } + static GroupManagementRequestProto toGroupManagementRequestProto(GroupManagementRequest request) { final GroupManagementRequestProto.Builder b = GroupManagementRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(request)); @@ -345,9 +382,16 @@ public interface ClientProtoUtils { return b.build(); } - static ServerInformationRequestProto toServerInformationRequestProto( - ServerInformationRequest request) { - return ServerInformationRequestProto.newBuilder() + static GroupInfoRequestProto toGroupInfoRequestProto( + GroupInfoRequest request) { + return GroupInfoRequestProto.newBuilder() + .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) + .build(); + } + + static GroupListRequestProto toGroupListRequestProto( + GroupListRequest request) { + return GroupListRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) .build(); } @@ -363,4 +407,5 @@ public interface ClientProtoUtils { return ClientId.valueOf(rpc.getRequestorId()) + "<-" + rpc.getReplyId().toStringUtf8() + "#" + rpc.getCallId(); } + } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 93c5caa..c49a360 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -29,9 +29,7 @@ import org.apache.ratis.util.*; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Objects; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -245,14 +243,22 @@ final class RaftClientImpl implements RaftClient { } @Override - public RaftClientReply serverInformation(RaftPeerId server) + public RaftClientReply getGroups(RaftPeerId server) throws IOException { Objects.requireNonNull(server, "server == null"); - return sendRequest(new ServerInformationRequest(clientId, server, + return sendRequest(new GroupListRequest(clientId, server, groupId, nextCallId())); } + @Override + public RaftClientReply getGroupInfo(RaftGroupId raftGroupId, RaftPeerId server) throws IOException { + Objects.requireNonNull(server, "server == null"); + RaftGroupId rgi = raftGroupId == null ? groupId : raftGroupId; + return sendRequest(new GroupInfoRequest(clientId, server, + rgi, nextCallId())); + } + private void addServers(Stream<RaftPeer> peersInNewConf) { clientRpc.addServers( peersInNewConf.filter(p -> !peers.contains(p))::iterator); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java index 6c545a8..4d8bd77 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java @@ -22,8 +22,11 @@ import java.util.concurrent.CompletableFuture; /** Asynchronous version of {@link AdminProtocol}. */ public interface AdminAsynchronousProtocol { - CompletableFuture<ServerInformationReply> getInfoAsync( - ServerInformationRequest request) throws IOException; + CompletableFuture<GroupListReply> getGroupListAsync( + GroupListRequest request) throws IOException; + + CompletableFuture<GroupInfoReply> getGroupInfoAsync( + GroupInfoRequest request) throws IOException; CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java index 03bb266..ed9eed7 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java @@ -21,7 +21,9 @@ import java.io.IOException; /** For server administration. */ public interface AdminProtocol { - ServerInformationReply getInfo(ServerInformationRequest request) throws IOException; + GroupListReply getGroupList(GroupListRequest request) throws IOException; + + GroupInfoReply getGroupInfo(GroupInfoRequest request) throws IOException; RaftClientReply groupManagement(GroupManagementRequest request) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java new file mode 100644 index 0000000..3a2d782 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.RoleInfoProto; + +import java.util.Collection; + +/** + * The response of server information request. Sent from server to client. + */ +public class GroupInfoReply extends RaftClientReply { + + private final RaftGroup group; + private final RoleInfoProto roleInfoProto; + private final boolean isRaftStorageHealthy; + + public GroupInfoReply( + RaftClientRequest request, RoleInfoProto roleInfoProto, + boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { + super(request, commitInfos); + this.roleInfoProto = roleInfoProto; + this.isRaftStorageHealthy = isRaftStorageHealthy; + this.group = group; + } + + public GroupInfoReply( + ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, + long callId, boolean success, RoleInfoProto roleInfoProto, + boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { + super(clientId, serverId, groupId, callId, success, null, null, 0L, commitInfos); + this.roleInfoProto = roleInfoProto; + this.isRaftStorageHealthy = isRaftStorageHealthy; + this.group = group; + } + + public RaftGroup getGroup() { + return group; + } + + public RoleInfoProto getRoleInfoProto() { + return roleInfoProto; + } + + public boolean isRaftStorageHealthy() { + return isRaftStorageHealthy; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java new file mode 100644 index 0000000..c9a4469 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * Client sends this request to a server to request for the information about + * the server itself. + */ +public class GroupInfoRequest extends RaftClientRequest { + public GroupInfoRequest(ClientId clientId, RaftPeerId serverId, + RaftGroupId groupId, long callId) { + super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java new file mode 100644 index 0000000..77dfdf2 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * The response of server information request. Sent from server to client. + */ +public class GroupListReply extends RaftClientReply { + + private final Iterable<RaftGroupId> groupIds; + + public GroupListReply( + RaftClientRequest request, Iterable<RaftGroupId> groupIds) { + super(request, null); + this.groupIds = groupIds; + } + + public GroupListReply( + ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, + long callId, boolean success, Iterable<RaftGroupId> groupIds) { + super(clientId, serverId, groupId, callId, success, null, null, 0L, null); + this.groupIds = groupIds; + } + + public Iterable<RaftGroupId> getGroupIds() { + return groupIds; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java new file mode 100644 index 0000000..d661f52 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * Client sends this request to a server to request for the information about + * the server itself. + */ +public class GroupListRequest extends RaftClientRequest { + public GroupListRequest(ClientId clientId, RaftPeerId serverId, + RaftGroupId groupId, long callId) { + super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java deleted file mode 100644 index 45a94ea..0000000 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.protocol; - -import org.apache.ratis.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.proto.RaftProtos.CommitInfoProto; - -import java.util.Collection; - -/** - * The response of server information request. Sent from server to client. - */ -public class ServerInformationReply extends RaftClientReply { - private final RaftGroup group; - private final RoleInfoProto roleInfoProto; - private final boolean isRaftStorageHealthy; - - public ServerInformationReply( - RaftClientRequest request, RoleInfoProto roleInfoProto, - boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { - super(request, commitInfos); - this.roleInfoProto = roleInfoProto; - this.isRaftStorageHealthy = isRaftStorageHealthy; - this.group = group; - } - - public ServerInformationReply( - ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, - long callId, boolean success, RoleInfoProto roleInfoProto, - boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { - super(clientId, serverId, groupId, callId, success, null, null, 0L, commitInfos); - this.roleInfoProto = roleInfoProto; - this.isRaftStorageHealthy = isRaftStorageHealthy; - this.group = group; - } - - public RaftGroup getGroup() { - return group; - } - - public RoleInfoProto getRoleInfoProto() { - return roleInfoProto; - } - - public boolean isRaftStorageHealthy() { - return isRaftStorageHealthy; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationRequest.java deleted file mode 100644 index 6259332..0000000 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationRequest.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.protocol; - -/** - * Client sends this request to a server to request for the information about - * the server itself. - */ -public class ServerInformationRequest extends RaftClientRequest { - public ServerInformationRequest(ClientId clientId, RaftPeerId serverId, - RaftGroupId groupId, long callId) { - super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index 936ea5f..7aaf4a8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -105,12 +105,19 @@ public class GrpcClientProtocolClient implements Closeable { .groupManagement(request)); } - ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) { + GroupListReplyProto groupList(GroupListRequestProto request) { return adminBlockingStub .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .serverInformation(request); + .groupList(request); } + GroupInfoReplyProto groupInfo(GroupInfoRequestProto request) { + return adminBlockingStub + .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .groupInfo(request); + } + + RaftClientReplyProto setConfiguration( SetConfigurationRequestProto request) throws IOException { return blockingCall(() -> blockingStub http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index c1f3075..191aaca 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -22,18 +22,13 @@ import org.apache.ratis.client.impl.RaftClientRpcWithProxy; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcUtil; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.GroupManagementRequest; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.ServerInformationRequest; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; +import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto; +import org.apache.ratis.proto.RaftProtos.GroupListRequestProto; import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.proto.RaftProtos.ServerInformationRequestProto; import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; @@ -83,10 +78,14 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie final SetConfigurationRequestProto setConf = ClientProtoUtils.toSetConfigurationRequestProto( (SetConfigurationRequest) request); return ClientProtoUtils.toRaftClientReply(proxy.setConfiguration(setConf)); - } else if (request instanceof ServerInformationRequest){ - final ServerInformationRequestProto proto = ClientProtoUtils.toServerInformationRequestProto( - (ServerInformationRequest) request); - return ClientProtoUtils.toServerInformationReply(proxy.serverInformation(proto)); + } else if (request instanceof GroupListRequest){ + final GroupListRequestProto proto = ClientProtoUtils.toGroupListRequestProto( + (GroupListRequest) request); + return ClientProtoUtils.toGroupListReply(proxy.groupList(proto)); + } else if (request instanceof GroupInfoRequest){ + final GroupInfoRequestProto proto = ClientProtoUtils.toGroupInfoRequestProto( + (GroupInfoRequest) request); + return ClientProtoUtils.toGroupInfoReply(proxy.groupInfo(proto)); } else { final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy); // TODO: timeout support http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java index 4fe4c1a..f2bb8f6 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java @@ -19,14 +19,14 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.protocol.AdminAsynchronousProtocol; +import org.apache.ratis.protocol.GroupInfoRequest; +import org.apache.ratis.protocol.GroupListRequest; import org.apache.ratis.protocol.GroupManagementRequest; -import org.apache.ratis.protocol.ServerInformationRequest; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; -import org.apache.ratis.proto.RaftProtos.ServerInformationReplyProto; -import org.apache.ratis.proto.RaftProtos.ServerInformationRequestProto; import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase; public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase { @@ -44,10 +44,17 @@ public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase { } @Override - public void serverInformation(ServerInformationRequestProto proto, - StreamObserver<ServerInformationReplyProto> responseObserver) { - final ServerInformationRequest request = ClientProtoUtils.toServerInformationRequest(proto); - GrpcUtil.asyncCall(responseObserver, () -> protocol.getInfoAsync(request), - ClientProtoUtils::toServerInformationReplyProto); + public void groupList(GroupListRequestProto proto, + StreamObserver<GroupListReplyProto> responseObserver) { + final GroupListRequest request = ClientProtoUtils.toGroupListRequest(proto); + GrpcUtil.asyncCall(responseObserver, () -> protocol.getGroupListAsync(request), + ClientProtoUtils::toGroupListReplyProto); + } + + @Override + public void groupInfo(GroupInfoRequestProto proto, StreamObserver<GroupInfoReplyProto> responseObserver) { + final GroupInfoRequest request = ClientProtoUtils.toGroupInfoRequest(proto); + GrpcUtil.asyncCall(responseObserver, () -> protocol.getGroupInfoAsync(request), + ClientProtoUtils::toGroupInfoReplyProto); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java index 053df49..fc28a4e 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java @@ -21,12 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.hadooprpc.Proxy; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.GroupManagementRequest; -import org.apache.ratis.protocol.ServerInformationRequest; -import org.apache.ratis.protocol.ServerInformationReply; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; import org.apache.ratis.thirdparty.com.google.protobuf.ServiceException; import org.apache.ratis.util.CheckedFunction; import org.apache.ratis.util.ProtoUtils; @@ -75,11 +70,19 @@ public class CombinedClientProtocolClientSideTranslatorPB } @Override - public ServerInformationReply getInfo(ServerInformationRequest request) throws IOException { + public GroupListReply getGroupList(GroupListRequest request) throws IOException { return handleRequest(request, - ClientProtoUtils::toServerInformationRequestProto, - ClientProtoUtils::toServerInformationReply, - p -> getProtocol().serverInformation(null, p)); + ClientProtoUtils::toGroupListRequestProto, + ClientProtoUtils::toGroupListReply, + p -> getProtocol().groupList(null, p)); + } + + @Override + public GroupInfoReply getGroupInfo(GroupInfoRequest request) throws IOException { + return handleRequest(request, + ClientProtoUtils::toGroupInfoRequestProto, + ClientProtoUtils::toGroupInfoReply, + p -> getProtocol().groupInfo(null, p)); } static <REQUEST extends RaftClientRequest, REPLY extends RaftClientReply, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java index 77fc8ef..9ebb88a 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServer; import org.apache.ratis.thirdparty.com.google.protobuf.RpcController; @@ -29,8 +30,11 @@ import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; -import org.apache.ratis.proto.RaftProtos.ServerInformationRequestProto; -import org.apache.ratis.proto.RaftProtos.ServerInformationReplyProto; +import org.apache.ratis.proto.RaftProtos.GroupListRequestProto; +import org.apache.ratis.proto.RaftProtos.GroupListReplyProto; +import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto; +import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto; + @InterfaceAudience.Private public class CombinedClientProtocolServerSideTranslatorPB @@ -82,14 +86,26 @@ public class CombinedClientProtocolServerSideTranslatorPB } @Override - public ServerInformationReplyProto serverInformation( - RpcController controller, ServerInformationRequestProto proto) + public GroupListReplyProto groupList( + RpcController controller, GroupListRequestProto proto) throws ServiceException { - final ServerInformationRequest request; + final GroupListRequest request; + try { + request = ClientProtoUtils.toGroupListRequest(proto); + final GroupListReply reply = impl.getGroupList(request); + return ClientProtoUtils.toGroupListReplyProto(reply); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public GroupInfoReplyProto groupInfo(RpcController controller, GroupInfoRequestProto proto) throws ServiceException { + final GroupInfoRequest request; try { - request = ClientProtoUtils.toServerInformationRequest(proto); - final ServerInformationReply reply = impl.getInfo(request); - return ClientProtoUtils.toServerInformationReplyProto(reply); + request = ClientProtoUtils.toGroupInfoRequest(proto); + final GroupInfoReply reply = impl.getGroupInfo(request); + return ClientProtoUtils.toGroupInfoReplyProto(reply); } catch (IOException ioe) { throw new ServiceException(ioe); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java index 38b401e..ffb6dc8 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java @@ -42,8 +42,10 @@ public class HadoopClientRpc extends RaftClientRpcWithProxy<CombinedClientProtoc return proxy.groupManagement((GroupManagementRequest) request); } else if (request instanceof SetConfigurationRequest) { return proxy.setConfiguration((SetConfigurationRequest) request); - } else if (request instanceof ServerInformationRequest) { - return proxy.getInfo((ServerInformationRequest) request); + } else if (request instanceof GroupListRequest) { + return proxy.getGroupList((GroupListRequest) request); + } else if (request instanceof GroupInfoRequest) { + return proxy.getGroupInfo((GroupInfoRequest) request); } else { return proxy.submitClientRequest(request); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java index b763d58..bc6ebdc 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -52,19 +52,27 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> { (SetConfigurationRequest)request); b.setSetConfigurationRequest(proto); rpcRequest = proto.getRpcRequest(); - } else if (request instanceof ServerInformationRequest) { - final RaftProtos.ServerInformationRequestProto proto = ClientProtoUtils.toServerInformationRequestProto( - (ServerInformationRequest)request); - b.setServerInformationRequest(proto); + } else if (request instanceof GroupListRequest) { + final RaftProtos.GroupListRequestProto proto = ClientProtoUtils.toGroupListRequestProto( + (GroupListRequest)request); + b.setGroupListRequest(proto); + rpcRequest = proto.getRpcRequest(); + } else if (request instanceof GroupInfoRequest) { + final RaftProtos.GroupInfoRequestProto proto = ClientProtoUtils.toGroupInfoRequestProto( + (GroupInfoRequest)request); + b.setGroupInfoRequest(proto); rpcRequest = proto.getRpcRequest(); } else { final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request); b.setRaftClientRequest(proto); rpcRequest = proto.getRpcRequest(); } - if (request instanceof ServerInformationRequest) { - return ClientProtoUtils.toServerInformationReply( - proxy.send(rpcRequest, b.build()).getServerInfoReply()); + if (request instanceof GroupListRequest) { + return ClientProtoUtils.toGroupListReply( + proxy.send(rpcRequest, b.build()).getGroupListReply()); + } else if (request instanceof GroupInfoRequest) { + return ClientProtoUtils.toGroupInfoReply( + proxy.send(rpcRequest, b.build()).getGroupInfoReply()); } else { return ClientProtoUtils.toRaftClientReply( proxy.send(rpcRequest, b.build()).getRaftClientReply()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index adcd786..4eaeb98 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -20,9 +20,10 @@ package org.apache.ratis.netty.server; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.netty.NettyRpcProxy; +import org.apache.ratis.protocol.GroupInfoReply; +import org.apache.ratis.protocol.GroupListReply; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.ServerInformationReply; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; @@ -202,13 +203,22 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) .build(); } - case SERVERINFORMATIONREQUEST: { - final ServerInformationRequestProto request = proto.getServerInformationRequest(); + case GROUPLISTREQUEST: { + final GroupListRequestProto request = proto.getGroupListRequest(); rpcRequest = request.getRpcRequest(); - final ServerInformationReply reply = server.getInfo( - ClientProtoUtils.toServerInformationRequest(request)); + final GroupListReply reply = server.getGroupList( + ClientProtoUtils.toGroupListRequest(request)); return RaftNettyServerReplyProto.newBuilder() - .setServerInfoReply(ClientProtoUtils.toServerInformationReplyProto(reply)) + .setGroupListReply(ClientProtoUtils.toGroupListReplyProto(reply)) + .build(); + } + case GROUPINFOREQUEST: { + final GroupInfoRequestProto request = proto.getGroupInfoRequest(); + rpcRequest = request.getRpcRequest(); + final GroupInfoReply reply = server.getGroupInfo( + ClientProtoUtils.toGroupInfoRequest(request)); + return RaftNettyServerReplyProto.newBuilder() + .setGroupInfoReply(ClientProtoUtils.toGroupInfoReplyProto(reply)) .build(); } case RAFTNETTYSERVERREQUEST_NOT_SET: http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-proto/src/main/proto/Grpc.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto index 3126fdb..950a73e 100644 --- a/ratis-proto/src/main/proto/Grpc.proto +++ b/ratis-proto/src/main/proto/Grpc.proto @@ -49,6 +49,9 @@ service AdminProtocolService { rpc groupManagement(ratis.common.GroupManagementRequestProto) returns(ratis.common.RaftClientReplyProto) {} - rpc serverInformation(ratis.common.ServerInformationRequestProto) - returns(ratis.common.ServerInformationReplyProto) {} + rpc groupList(ratis.common.GroupListRequestProto) + returns(ratis.common.GroupListReplyProto) {} + + rpc groupInfo(ratis.common.GroupInfoRequestProto) + returns(ratis.common.GroupInfoReplyProto) {} } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-proto/src/main/proto/Hadoop.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Hadoop.proto b/ratis-proto/src/main/proto/Hadoop.proto index dd220ef..5d81b6c 100644 --- a/ratis-proto/src/main/proto/Hadoop.proto +++ b/ratis-proto/src/main/proto/Hadoop.proto @@ -34,8 +34,11 @@ service CombinedClientProtocolService { rpc groupManagement(ratis.common.GroupManagementRequestProto) returns(ratis.common.RaftClientReplyProto); - rpc serverInformation(ratis.common.ServerInformationRequestProto) - returns(ratis.common.ServerInformationReplyProto); + rpc groupList(ratis.common.GroupListRequestProto) + returns(ratis.common.GroupListReplyProto); + + rpc groupInfo(ratis.common.GroupInfoRequestProto) + returns(ratis.common.GroupInfoReplyProto); } service RaftServerProtocolService { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-proto/src/main/proto/Netty.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Netty.proto b/ratis-proto/src/main/proto/Netty.proto index b817e8b..61d9b28 100644 --- a/ratis-proto/src/main/proto/Netty.proto +++ b/ratis-proto/src/main/proto/Netty.proto @@ -36,7 +36,8 @@ message RaftNettyServerRequestProto { ratis.common.RaftClientRequestProto raftClientRequest = 4; ratis.common.SetConfigurationRequestProto setConfigurationRequest = 5; ratis.common.GroupManagementRequestProto groupManagementRequest = 6; - ratis.common.ServerInformationRequestProto serverInformationRequest = 7; + ratis.common.GroupListRequestProto groupListRequest = 7; + ratis.common.GroupInfoRequestProto groupInfoRequest = 8; } } @@ -46,7 +47,8 @@ message RaftNettyServerReplyProto { ratis.common.AppendEntriesReplyProto appendEntriesReply = 2; ratis.common.InstallSnapshotReplyProto installSnapshotReply = 3; ratis.common.RaftClientReplyProto raftClientReply = 4; - ratis.common.ServerInformationReplyProto serverInfoReply = 5; - RaftNettyExceptionReplyProto exceptionReply = 6; + ratis.common.GroupListReplyProto groupListReply = 5; + ratis.common.GroupInfoReplyProto groupInfoReply = 6; + RaftNettyExceptionReplyProto exceptionReply = 7; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-proto/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index fbebe9f..13b097c 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -291,11 +291,16 @@ message GroupManagementRequestProto { } } -// server info request -message ServerInformationRequestProto { +// server info requests +message GroupListRequestProto { RaftRpcRequestProto rpcRequest = 1; } +message GroupInfoRequestProto { + RaftRpcRequestProto rpcRequest = 1; + RaftGroupIdProto groupId = 2; +} + message ServerRpcProto { RaftPeerProto id = 1; uint64 lastRpcElapsedTimeMs = 2; @@ -326,7 +331,12 @@ message RoleInfoProto { } } -message ServerInformationReplyProto { +message GroupListReplyProto { + RaftRpcReplyProto rpcReply = 1; + repeated RaftGroupIdProto groupId = 2; +} + +message GroupInfoReplyProto { RaftRpcReplyProto rpcReply = 1; RaftGroupProto group = 2; RoleInfoProto role = 3; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index 111b31d..47055b8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -44,6 +44,9 @@ public interface RaftServer extends Closeable, RpcType.Get, /** @return the group IDs the server is part of. */ Iterable<RaftGroupId> getGroupIds() throws IOException; + /** @return the groups the server is part of. */ + Iterable<RaftGroup> getGroups() throws IOException; + /** @return the server properties. */ RaftProperties getProperties(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 896e550..e9cf119 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -40,12 +40,7 @@ import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.OptionalLong; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -54,6 +49,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; @@ -350,8 +347,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return infos; } - ServerInformationReply getServerInformation(ServerInformationRequest request) { - return new ServerInformationReply(request, getRoleInfoProto(), + GroupListReply getGroupList(GroupListRequest request) { + Iterable<RaftGroupId> groupIds = null; + try { + groupIds = proxy.getGroupIds(); + } catch (IOException e) { + } + return new GroupListReply(request, groupIds); + } + + GroupInfoReply getGroupInfo(GroupInfoRequest request) { + return new GroupInfoReply(request, getRoleInfoProto(), state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), getGroup()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 53b77a3..7648247 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -222,6 +222,11 @@ public class RaftServerProxy implements RaftServer { } @Override + public Iterable<RaftGroup> getGroups() throws IOException { + return getImpls().stream().map(RaftServerImpl::getGroup).collect(Collectors.toList()); + } + + @Override public RpcType getRpcType() { return getFactory().getRpcType(); } @@ -385,19 +390,31 @@ public class RaftServerProxy implements RaftServer { } @Override - public ServerInformationReply getInfo(ServerInformationRequest request) + public GroupListReply getGroupList(GroupListRequest request) throws IOException { - return RaftServerImpl.waitForReply(getId(), request, getInfoAsync(request), + return RaftServerImpl.waitForReply(getId(), request, getGroupListAsync(request), r -> null); } @Override - public CompletableFuture<ServerInformationReply> getInfoAsync( - ServerInformationRequest request) { + public CompletableFuture<GroupListReply> getGroupListAsync( + GroupListRequest request) { return getImplFuture(request.getRaftGroupId()).thenApplyAsync( - server -> server.getServerInformation(request)); + server -> server.getGroupList(request)); + } + + @Override + public GroupInfoReply getGroupInfo(GroupInfoRequest request) throws IOException { + return RaftServerImpl.waitForReply(getId(), request, getGroupInfoAsync(request), r -> null); } + @Override + public CompletableFuture<GroupInfoReply> getGroupInfoAsync(GroupInfoRequest request) throws IOException { + return getImplFuture(request.getRaftGroupId()).thenApplyAsync( + server -> server.getGroupInfo(request)); + } + + /** * Handle a raft configuration change request from client. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java index dd440cb..30aae33 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java @@ -30,6 +30,8 @@ import org.junit.Test; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.ratis.util.Preconditions.assertTrue; @@ -56,14 +58,37 @@ public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster> RaftGroup group = cluster.getGroup(); List<RaftPeer> peers = cluster.getPeers(); - // check that all the peers return exactly this group when group information - // is requested. - for (RaftPeer peer : peers) { + + //Multi-raft with the second group + RaftGroup group2 = RaftGroup.valueOf(RaftGroupId.randomId(), peers); + for(RaftPeer peer : peers) { try(final RaftClient client = cluster.createClient(peer.getId())) { - RaftClientReply reply = client.serverInformation(peer.getId()); - assertTrue(reply instanceof ServerInformationReply); - ServerInformationReply info = (ServerInformationReply)reply; - assertTrue(sameGroup(group, info.getGroup())); + client.groupAdd(group2, peer.getId()); + } + } + // check that all the peers return the list where both groups are included. And able to return GroupInfo + // for each of them. + for (RaftPeer peer : peers) { + try (final RaftClient client = cluster.createClient(peer.getId())) { + RaftClientReply reply = client.getGroups(peer.getId()); + assertTrue(reply instanceof GroupListReply); + GroupListReply info = (GroupListReply) reply; + List<RaftGroupId> groupList = (StreamSupport + .stream(info.getGroupIds().spliterator(), false) + .filter(id -> group.getGroupId().equals(id)).collect(Collectors.toList())); + assert (groupList.size() == 1); + reply = client.getGroupInfo(groupList.get(0), peer.getId()); + assertTrue(reply instanceof GroupInfoReply); + GroupInfoReply gi = (GroupInfoReply) reply; + assert (sameGroup(group, gi.getGroup())); + groupList = (StreamSupport + .stream(info.getGroupIds().spliterator(), false) + .filter(id -> group2.getGroupId().equals(id)).collect(Collectors.toList())); + assert (groupList.size() == 1); + reply = client.getGroupInfo(groupList.get(0), peer.getId()); + assertTrue(reply instanceof GroupInfoReply); + gi = (GroupInfoReply) reply; + assert (sameGroup(group2, gi.getGroup())); } } @@ -89,16 +114,18 @@ public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster> } } - // check serverInformation + // check getGroups for(RaftPeer peer : peers) { if (peer.getId().equals(killedFollower)) { continue; } try(final RaftClient client = cluster.createClient(peer.getId())) { - RaftClientReply reply = client.serverInformation(peer.getId()); - assertTrue(reply instanceof ServerInformationReply); - ServerInformationReply info = (ServerInformationReply)reply; - assertTrue(sameGroup(group, info.getGroup())); + RaftClientReply reply = client.getGroups(peer.getId()); + assertTrue(reply instanceof GroupListReply); + GroupListReply info = (GroupListReply)reply; + assertTrue(StreamSupport + .stream(info.getGroupIds().spliterator(), false) + .filter(id -> group.getGroupId().equals(id)).collect(Collectors.toList()).size() == 1); for(CommitInfoProto i : info.getCommitInfos()) { if (RaftPeerId.valueOf(i.getServer().getId()).equals(killedFollower)) { Assert.assertTrue(i.getCommitIndex() <= maxCommit); @@ -114,7 +141,7 @@ public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster> RaftClientReply sendMessages(int n, MiniRaftCluster cluster) throws Exception { LOG.info("sendMessages: " + n); - final RaftPeerId leader = RaftTestUtil.waitForLeader(cluster).getId(); + final RaftPeerId leader = RaftTestUtil.waitForLeader(cluster, true, cluster.getGroupId()).getId(); RaftClientReply reply = null; try(final RaftClient client = cluster.createClient(leader)) { for(int i = 0; i < n; i++) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/39916f23/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index 52de86f..e9df14e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -158,9 +158,12 @@ class SimulatedServerRpc implements RaftServerRpc { if (request instanceof GroupManagementRequest) { future = CompletableFuture.completedFuture( server.groupManagement((GroupManagementRequest) request)); - } else if (request instanceof ServerInformationRequest) { + } else if (request instanceof GroupListRequest) { future = CompletableFuture.completedFuture( - server.getInfo((ServerInformationRequest) request)); + server.getGroupList((GroupListRequest) request)); + } else if (request instanceof GroupInfoRequest) { + future = CompletableFuture.completedFuture( + server.getGroupInfo((GroupInfoRequest) request)); } else if (request instanceof SetConfigurationRequest) { future = server.setConfigurationAsync((SetConfigurationRequest) request); } else {
