Repository: incubator-ratis Updated Branches: refs/heads/master 89b1a1cd9 -> 09b099c71
RATIS-305. Replace the reinitialize() API with some group management APIs. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/09b099c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/09b099c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/09b099c7 Branch: refs/heads/master Commit: 09b099c71d4f112e4c7802cab1b48f2b84e0620b Parents: 89b1a1c Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Sep 10 16:15:53 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Sep 10 16:15:53 2018 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 7 +- .../ratis/client/impl/ClientProtoUtils.java | 41 ++-- .../ratis/client/impl/RaftClientImpl.java | 15 +- .../protocol/AdminAsynchronousProtocol.java | 5 +- .../apache/ratis/protocol/AdminProtocol.java | 4 +- .../ratis/protocol/GroupManagementRequest.java | 92 ++++++++ .../ratis/protocol/RaftClientMessage.java | 4 +- .../ratis/protocol/ReinitializeRequest.java | 37 ---- .../test/java/org/apache/ratis/BaseTest.java | 6 +- .../org/apache/ratis/TestMultiRaftGroup.java | 4 +- .../apache/ratis/grpc/client/GrpcClientRpc.java | 33 +-- .../grpc/client/RaftClientProtocolClient.java | 5 +- .../ratis/grpc/server/AdminProtocolService.java | 11 +- .../ratis/grpc/TestGroupManagementWithGrpc.java | 28 +++ .../grpc/TestReinitializationWithGrpc.java | 28 --- ...nedClientProtocolClientSideTranslatorPB.java | 8 +- ...nedClientProtocolServerSideTranslatorPB.java | 11 +- .../ratis/hadooprpc/client/HadoopClientRpc.java | 4 +- .../TestGroupManagementWithHadoopRpc.java | 28 +++ .../TestReinitializationWithHadoopRpc.java | 28 --- .../ratis/netty/client/NettyClientRpc.java | 10 +- .../ratis/netty/server/NettyRpcService.java | 8 +- .../netty/TestGroupManagementWithNetty.java | 28 +++ .../netty/TestReinitializationWithNetty.java | 28 --- ratis-proto-shaded/src/main/proto/GRpc.proto | 4 +- ratis-proto-shaded/src/main/proto/Hadoop.proto | 2 +- ratis-proto-shaded/src/main/proto/Netty.proto | 2 +- ratis-proto-shaded/src/main/proto/Raft.proto | 18 +- .../ratis/server/impl/RaftServerProxy.java | 65 ++++-- .../org/apache/ratis/RaftExceptionBaseTest.java | 4 +- .../server/impl/GroupManagementBaseTest.java | 218 +++++++++++++++++++ .../server/impl/ReinitializationBaseTest.java | 218 ------------------- .../server/simulation/SimulatedServerRpc.java | 4 +- .../TestGroupManagementWithSimulatedRpc.java | 28 +++ .../TestReinitializationWithSimulatedRpc.java | 28 --- 35 files changed, 592 insertions(+), 472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 5562f59..ead9155 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -89,8 +89,11 @@ public interface RaftClient extends Closeable { /** Send set configuration request to the raft service. */ RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException; - /** Send reinitialize request to the given server (not the raft service). */ - RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server) throws IOException; + /** Send groupAdd request to the given server (not the raft service). */ + RaftClientReply groupAdd(RaftGroup newGroup, RaftPeerId server) throws IOException; + + /** Send groupRemove request to the given server (not the raft service). */ + RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException; /** Send serverInformation request to the given server.*/ RaftClientReply serverInformation(RaftPeerId server) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 1c41a3b..7065dd4 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -293,15 +293,20 @@ public interface ClientProtoUtils { .build(); } - static ReinitializeRequest toReinitializeRequest( - ReinitializeRequestProto p) { + static GroupManagementRequest toGroupManagementRequest(GroupManagementRequestProto p) { final RaftRpcRequestProto m = p.getRpcRequest(); - return new ReinitializeRequest( - ClientId.valueOf(m.getRequestorId()), - RaftPeerId.valueOf(m.getReplyId()), - ProtoUtils.toRaftGroupId(m.getRaftGroupId()), - m.getCallId(), - ProtoUtils.toRaftGroup(p.getGroup())); + final ClientId clientId = ClientId.valueOf(m.getRequestorId()); + final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId()); + switch(p.getOpCase()) { + case GROUPADD: + return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(), + ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup())); + case GROUPREMOVE: + return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(), + ProtoUtils.toRaftGroupId(p.getGroupRemove().getGroupId())); + default: + throw new IllegalArgumentException("Unexpected op " + p.getOpCase() + " in " + p); + } } static ServerInformationRequest toServerInformationRequest( @@ -314,12 +319,20 @@ public interface ClientProtoUtils { m.getCallId()); } - static ReinitializeRequestProto toReinitializeRequestProto( - ReinitializeRequest request) { - return ReinitializeRequestProto.newBuilder() - .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) - .setGroup(ProtoUtils.toRaftGroupProtoBuilder(request.getGroup())) - .build(); + static GroupManagementRequestProto toGroupManagementRequestProto(GroupManagementRequest request) { + final GroupManagementRequestProto.Builder b = GroupManagementRequestProto.newBuilder() + .setRpcRequest(toRaftRpcRequestProtoBuilder(request)); + final GroupManagementRequest.Add add = request.getAdd(); + if (add != null) { + b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup( + ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build()); + } + final GroupManagementRequest.Remove remove = request.getRemove(); + if (remove != null) { + b.setGroupRemove(GroupRemoveRequestProto.newBuilder().setGroupId( + ProtoUtils.toRaftGroupIdProtoBuilder(remove.getGroupId())).build()); + } + return b.build(); } static ServerInformationRequestProto toServerInformationRequestProto( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index eb78463..9419c7f 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -206,15 +206,22 @@ final class RaftClientImpl implements RaftClient { } @Override - public RaftClientReply reinitialize(RaftGroup newGroup, RaftPeerId server) - throws IOException { + public RaftClientReply groupAdd(RaftGroup newGroup, RaftPeerId server) throws IOException { Objects.requireNonNull(newGroup, "newGroup == null"); Objects.requireNonNull(server, "server == null"); final long callId = nextCallId(); addServers(newGroup.getPeers().stream()); - return sendRequest(new ReinitializeRequest( - clientId, server, groupId, callId, newGroup)); + return sendRequest(GroupManagementRequest.newAdd(clientId, server, callId, newGroup)); + } + + @Override + public RaftClientReply groupRemove(RaftGroupId groupId, RaftPeerId server) throws IOException { + Objects.requireNonNull(groupId, "groupId == null"); + Objects.requireNonNull(server, "server == null"); + + final long callId = nextCallId(); + return sendRequest(GroupManagementRequest.newRemove(clientId, server, callId, groupId)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java index 4907386..6c545a8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java @@ -22,9 +22,8 @@ import java.util.concurrent.CompletableFuture; /** Asynchronous version of {@link AdminProtocol}. */ public interface AdminAsynchronousProtocol { - CompletableFuture<RaftClientReply> reinitializeAsync( - ReinitializeRequest request) throws IOException; - CompletableFuture<ServerInformationReply> getInfoAsync( ServerInformationRequest request) throws IOException; + + CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java index a480953..03bb266 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java @@ -21,7 +21,7 @@ import java.io.IOException; /** For server administration. */ public interface AdminProtocol { - RaftClientReply reinitialize(ReinitializeRequest request) throws IOException; - ServerInformationReply getInfo(ServerInformationRequest request) throws IOException; + + RaftClientReply groupManagement(GroupManagementRequest request) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java new file mode 100644 index 0000000..8577548 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +public class GroupManagementRequest extends RaftClientRequest { + public static abstract class Op { + public abstract RaftGroupId getGroupId(); + } + + public static class Add extends Op { + private final RaftGroup group; + + public Add(RaftGroup group) { + this.group = group; + } + + @Override + public RaftGroupId getGroupId() { + return getGroup().getGroupId(); + } + + public RaftGroup getGroup() { + return group; + } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + getGroup(); + } + } + + public static class Remove extends Op { + private final RaftGroupId groupId; + + public Remove(RaftGroupId groupId) { + this.groupId = groupId; + } + + @Override + public RaftGroupId getGroupId() { + return groupId; + } + + @Override + public String toString() { + return getClass().getSimpleName() + ":" + getGroupId(); + } + } + + public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) { + return new GroupManagementRequest(clientId, serverId, callId, new Add(group)); + } + + public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId, RaftGroupId groupId) { + return new GroupManagementRequest(clientId, serverId, callId, new Remove(groupId)); + } + + private final Op op; + + private GroupManagementRequest(ClientId clientId, RaftPeerId serverId, long callId, Op op) { + super(clientId, serverId, op.getGroupId(), callId); + this.op = op; + } + + public Add getAdd() { + return op instanceof Add? (Add)op: null; + } + + public Remove getRemove() { + return op instanceof Remove? (Remove)op: null; + } + + @Override + public String toString() { + return super.toString() + ", " + op; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java index 07354d4..8a2e4a9 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java @@ -54,7 +54,7 @@ public abstract class RaftClientMessage implements RaftRpcMessage { @Override public String toString() { - return getClass().getSimpleName() + "(" + clientId + "->" + serverId - + ") in " + groupId; + return getClass().getSimpleName() + ":" + clientId + "->" + serverId + + (groupId != null? "@" + groupId: ""); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java deleted file mode 100644 index b0e69af..0000000 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.protocol; - -public class ReinitializeRequest extends RaftClientRequest { - private final RaftGroup group; - - public ReinitializeRequest(ClientId clientId, RaftPeerId serverId, - RaftGroupId groupId, long callId, RaftGroup group) { - super(clientId, serverId, groupId, callId); - this.group = group; - } - - public RaftGroup getGroup() { - return group; - } - - @Override - public String toString() { - return super.toString() + ", " + getGroup(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-common/src/test/java/org/apache/ratis/BaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java index e487841..7efd9a7 100644 --- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java +++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java @@ -98,12 +98,16 @@ public abstract class BaseTest { String description, CheckedRunnable<?> testCode, Class<? extends Throwable> exceptedThrowableClass, Logger log, Class<? extends Throwable>... exceptedCauseClasses) { + boolean caught = false; try { testCode.run(); - Assert.fail("The test \"" + description + "\" does not throw anything."); } catch (Throwable t) { + caught = true; assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses); } + if (!caught) { + Assert.fail("The test \"" + description + "\" does not throw anything."); + } } public void testFailureCase( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java index b3b0998..030badd 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java @@ -25,7 +25,7 @@ import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; import org.apache.ratis.examples.arithmetic.TestArithmetic; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.ReinitializationBaseTest; +import org.apache.ratis.server.impl.GroupManagementBaseTest; import org.apache.ratis.util.CheckedBiConsumer; import org.apache.ratis.util.LogUtils; import org.junit.Test; @@ -70,7 +70,7 @@ public class TestMultiRaftGroup extends BaseTest { } }; - ReinitializationBaseTest.runTestReinitializeMultiGroups( + GroupManagementBaseTest.runMultiGroupTest( cluster, idIndex, chosen, checker); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 8c26c7f..160ae16 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -22,11 +22,18 @@ import org.apache.ratis.client.impl.RaftClientRpcWithProxy; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.*; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.GroupManagementRequest; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.ServerInformationRequest; +import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; @@ -39,8 +46,6 @@ import java.io.InterruptedIOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import static org.apache.ratis.client.impl.ClientProtoUtils.*; - public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); @@ -71,19 +76,17 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie throws IOException { final RaftPeerId serverId = request.getServerId(); final RaftClientProtocolClient proxy = getProxies().getProxy(serverId); - if (request instanceof ReinitializeRequest) { - RaftProtos.ReinitializeRequestProto proto = - toReinitializeRequestProto((ReinitializeRequest) request); - return toRaftClientReply(proxy.reinitialize(proto)); + if (request instanceof GroupManagementRequest) { + final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto((GroupManagementRequest)request); + return ClientProtoUtils.toRaftClientReply(proxy.groupAdd(proto)); } else if (request instanceof SetConfigurationRequest) { - SetConfigurationRequestProto setConf = - toSetConfigurationRequestProto((SetConfigurationRequest) request); - return toRaftClientReply(proxy.setConfiguration(setConf)); + final SetConfigurationRequestProto setConf = ClientProtoUtils.toSetConfigurationRequestProto( + (SetConfigurationRequest) request); + return ClientProtoUtils.toRaftClientReply(proxy.setConfiguration(setConf)); } else if (request instanceof ServerInformationRequest){ - RaftProtos.ServerInformationRequestProto proto = - toServerInformationRequestProto((ServerInformationRequest) request); - return ClientProtoUtils.toServerInformationReply( - proxy.serverInformation(proto)); + final ServerInformationRequestProto proto = ClientProtoUtils.toServerInformationRequestProto( + (ServerInformationRequest) request); + return ClientProtoUtils.toServerInformationReply(proxy.serverInformation(proto)); } else { final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy); // TODO: timeout support http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index 2d095ab..11f2676 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -97,11 +97,10 @@ public class RaftClientProtocolClient implements Closeable { channel.shutdownNow(); } - RaftClientReplyProto reinitialize( - ReinitializeRequestProto request) throws IOException { + RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { return blockingCall(() -> adminBlockingStub .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .reinitialize(request)); + .groupManagement(request)); } ServerInformationReplyProto serverInformation( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java index c7ba297..d65abd0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java @@ -20,11 +20,11 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.AdminAsynchronousProtocol; -import org.apache.ratis.protocol.ReinitializeRequest; +import org.apache.ratis.protocol.GroupManagementRequest; import org.apache.ratis.protocol.ServerInformationRequest; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto; import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase; @@ -37,10 +37,9 @@ public class AdminProtocolService extends AdminProtocolServiceImplBase { } @Override - public void reinitialize(ReinitializeRequestProto proto, - StreamObserver<RaftClientReplyProto> responseObserver) { - final ReinitializeRequest request = ClientProtoUtils.toReinitializeRequest(proto); - RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.reinitializeAsync(request), + public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) { + final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto); + RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request), ClientProtoUtils::toRaftClientReplyProto); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java new file mode 100644 index 0000000..0b5e2a9 --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.GroupManagementBaseTest; + +public class TestGroupManagementWithGrpc extends GroupManagementBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithGRpc.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java deleted file mode 100644 index 27cbf1e..0000000 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.server.impl.ReinitializationBaseTest; - -public class TestReinitializationWithGrpc extends ReinitializationBaseTest { - @Override - public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { - return MiniRaftClusterWithGRpc.FACTORY; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java index c47a69b..43fcca1 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java @@ -23,7 +23,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.hadooprpc.Proxy; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.ReinitializeRequest; +import org.apache.ratis.protocol.GroupManagementRequest; import org.apache.ratis.protocol.ServerInformationRequest; import org.apache.ratis.protocol.ServerInformationReply; import org.apache.ratis.protocol.SetConfigurationRequest; @@ -67,11 +67,11 @@ public class CombinedClientProtocolClientSideTranslatorPB } @Override - public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException { + public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException { return handleRequest(request, - ClientProtoUtils::toReinitializeRequestProto, + ClientProtoUtils::toGroupManagementRequestProto, ClientProtoUtils::toRaftClientReply, - p -> getProtocol().reinitialize(null, p)); + p -> getProtocol().groupManagement(null, p)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java index 5b88208..c47e5b0 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java @@ -28,7 +28,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ServiceException; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto; @@ -69,13 +69,12 @@ public class CombinedClientProtocolServerSideTranslatorPB } @Override - public RaftClientReplyProto reinitialize( - RpcController controller, ReinitializeRequestProto proto) + public RaftClientReplyProto groupManagement(RpcController controller, GroupManagementRequestProto proto) throws ServiceException { - final ReinitializeRequest request; + final GroupManagementRequest request; try { - request = ClientProtoUtils.toReinitializeRequest(proto); - final RaftClientReply reply = impl.reinitialize(request); + request = ClientProtoUtils.toGroupManagementRequest(proto); + final RaftClientReply reply = impl.groupManagement(request); return ClientProtoUtils.toRaftClientReplyProto(reply); } catch(IOException ioe) { throw new ServiceException(ioe); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java index e35875c..38b401e 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java @@ -38,8 +38,8 @@ public class HadoopClientRpc extends RaftClientRpcWithProxy<CombinedClientProtoc final CombinedClientProtocolClientSideTranslatorPB proxy = getProxies().getProxy(serverId); try { - if (request instanceof ReinitializeRequest) { - return proxy.reinitialize((ReinitializeRequest) request); + if (request instanceof GroupManagementRequest) { + return proxy.groupManagement((GroupManagementRequest) request); } else if (request instanceof SetConfigurationRequest) { return proxy.setConfiguration((SetConfigurationRequest) request); } else if (request instanceof ServerInformationRequest) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java new file mode 100644 index 0000000..af0074f --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestGroupManagementWithHadoopRpc.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.GroupManagementBaseTest; + +public class TestGroupManagementWithHadoopRpc extends GroupManagementBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithHadoopRpc.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java deleted file mode 100644 index 6efb012..0000000 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.hadooprpc; - -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.server.impl.ReinitializationBaseTest; - -public class TestReinitializationWithHadoopRpc extends ReinitializationBaseTest { - @Override - public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { - return MiniRaftClusterWithHadoopRpc.FACTORY; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java index f4c1f11..3bc2608 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -24,7 +24,7 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; @@ -42,10 +42,10 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> { final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); final RaftRpcRequestProto rpcRequest; - if (request instanceof ReinitializeRequest) { - final ReinitializeRequestProto proto = ClientProtoUtils.toReinitializeRequestProto( - (ReinitializeRequest)request); - b.setReinitializeRequest(proto); + if (request instanceof GroupManagementRequest) { + final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto( + (GroupManagementRequest)request); + b.setGroupManagementRequest(proto); rpcRequest = proto.getRpcRequest(); } else if (request instanceof SetConfigurationRequest) { final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 45c0b77..7dba943 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -193,11 +193,11 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) .build(); } - case REINITIALIZEREQUEST: { - final ReinitializeRequestProto request = proto.getReinitializeRequest(); + case GROUPMANAGEMENTREQUEST: { + final GroupManagementRequestProto request = proto.getGroupManagementRequest(); rpcRequest = request.getRpcRequest(); - final RaftClientReply reply = server.reinitialize( - ClientProtoUtils.toReinitializeRequest(request)); + final RaftClientReply reply = server.groupManagement( + ClientProtoUtils.toGroupManagementRequest(request)); return RaftNettyServerReplyProto.newBuilder() .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) .build(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java new file mode 100644 index 0000000..e049e32 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestGroupManagementWithNetty.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.GroupManagementBaseTest; + +public class TestGroupManagementWithNetty extends GroupManagementBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithNetty.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java deleted file mode 100644 index c378749..0000000 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.netty; - -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.server.impl.ReinitializationBaseTest; - -public class TestReinitializationWithNetty extends ReinitializationBaseTest { - @Override - public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { - return MiniRaftClusterWithNetty.FACTORY; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/GRpc.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto index 375079f..d7e550e 100644 --- a/ratis-proto-shaded/src/main/proto/GRpc.proto +++ b/ratis-proto-shaded/src/main/proto/GRpc.proto @@ -45,8 +45,8 @@ service RaftServerProtocolService { } service AdminProtocolService { - // A client-to-server RPC to reinitialize the server - rpc reinitialize(ratis.common.ReinitializeRequestProto) + // A client-to-server RPC to add a new group + rpc groupManagement(ratis.common.GroupManagementRequestProto) returns(ratis.common.RaftClientReplyProto) {} rpc serverInformation(ratis.common.ServerInformationRequestProto) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/Hadoop.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Hadoop.proto b/ratis-proto-shaded/src/main/proto/Hadoop.proto index 872c455..0d6107e 100644 --- a/ratis-proto-shaded/src/main/proto/Hadoop.proto +++ b/ratis-proto-shaded/src/main/proto/Hadoop.proto @@ -31,7 +31,7 @@ service CombinedClientProtocolService { rpc setConfiguration(ratis.common.SetConfigurationRequestProto) returns(ratis.common.RaftClientReplyProto); - rpc reinitialize(ratis.common.ReinitializeRequestProto) + rpc groupManagement(ratis.common.GroupManagementRequestProto) returns(ratis.common.RaftClientReplyProto); rpc serverInformation(ratis.common.ServerInformationRequestProto) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/Netty.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Netty.proto b/ratis-proto-shaded/src/main/proto/Netty.proto index c034dd5..40aa498 100644 --- a/ratis-proto-shaded/src/main/proto/Netty.proto +++ b/ratis-proto-shaded/src/main/proto/Netty.proto @@ -35,7 +35,7 @@ message RaftNettyServerRequestProto { ratis.common.InstallSnapshotRequestProto installSnapshotRequest = 3; ratis.common.RaftClientRequestProto raftClientRequest = 4; ratis.common.SetConfigurationRequestProto setConfigurationRequest = 5; - ratis.common.ReinitializeRequestProto reinitializeRequest = 6; + ratis.common.GroupManagementRequestProto groupManagementRequest = 6; ratis.common.ServerInformationRequestProto serverInformationRequest = 7; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 039a3f6..3d7eab3 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -244,10 +244,22 @@ message SetConfigurationRequestProto { repeated RaftPeerProto peers = 2; } -// reinitialize request -message ReinitializeRequestProto { +// A request to add a new group +message GroupAddRequestProto { + RaftGroupProto group = 1; // the group to be added. +} + +message GroupRemoveRequestProto { + RaftGroupIdProto groupId = 1; // the group to be removed. +} + +message GroupManagementRequestProto { RaftRpcRequestProto rpcRequest = 1; - RaftGroupProto group = 2; // the target group. + + oneof Op { + GroupAddRequestProto groupAdd = 2; + GroupRemoveRequestProto groupRemove = 3; + } } // server info request http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 27ec67f..acec50a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -39,6 +39,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; @@ -47,7 +48,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class RaftServerProxy implements RaftServer { @@ -148,7 +148,6 @@ public class RaftServerProxy implements RaftServer { private final ServerFactory factory; private final ImplMap impls = new ImplMap(); - private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>(); RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters) { @@ -296,24 +295,37 @@ public class RaftServerProxy implements RaftServer { } @Override - public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException { - return RaftServerImpl.waitForReply(getId(), request, reinitializeAsync(request), + public RaftClientReply groupManagement(GroupManagementRequest request) throws IOException { + return RaftServerImpl.waitForReply(getId(), request, groupManagementAsync(request), e -> new RaftClientReply(request, e, null)); } @Override - public CompletableFuture<RaftClientReply> reinitializeAsync( - ReinitializeRequest request) throws IOException { - LOG.info("{}: reinitialize* {}", getId(), request); - if (!reinitializeRequest.compareAndSet(null, request)) { - throw new IOException("Another reinitialize is already in progress."); + public CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request) { + final RaftGroupId groupId = request.getRaftGroupId(); + if (groupId == null) { + return JavaUtils.completeExceptionally(new GroupMismatchException( + getId() + ": Request group id == null")); } - final RaftGroupId oldGroupId = request.getRaftGroupId(); - return getImplFuture(oldGroupId) - .thenAcceptAsync(RaftServerImpl::shutdown) - .thenAccept(_1 -> impls.remove(oldGroupId)) - .thenCompose(_1 -> impls.addNew(request.getGroup())) - .thenApply(newImpl -> { + final GroupManagementRequest.Add add = request.getAdd(); + if (add != null) { + return groupdAddAsync(request, add.getGroup()); + } + final GroupManagementRequest.Remove remove = request.getRemove(); + if (remove != null) { + return groupRemoveAsync(request, remove.getGroupId()); + } + return JavaUtils.completeExceptionally(new UnsupportedOperationException( + getId() + ": Request not supported " + request)); + } + + private CompletableFuture<RaftClientReply> groupdAddAsync(GroupManagementRequest request, RaftGroup newGroup) { + if (!request.getRaftGroupId().equals(newGroup.getGroupId())) { + return JavaUtils.completeExceptionally(new GroupMismatchException( + getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup)); + } + return impls.addNew(newGroup) + .thenApplyAsync(newImpl -> { LOG.debug("{}: newImpl = {}", getId(), newImpl); final boolean started = newImpl.start(); Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl); @@ -321,14 +333,29 @@ public class RaftServerProxy implements RaftServer { }) .whenComplete((_1, throwable) -> { if (throwable != null) { - impls.remove(request.getGroup().getGroupId()); - LOG.warn(getId() + ": Failed reinitialize* " + request, throwable); + impls.remove(newGroup.getGroupId()); + LOG.warn(getId() + ": Failed groupAdd* " + request, throwable); } - - reinitializeRequest.set(null); }); } + private CompletableFuture<RaftClientReply> groupRemoveAsync(RaftClientRequest request, RaftGroupId groupId) { + if (!request.getRaftGroupId().equals(groupId)) { + return JavaUtils.completeExceptionally(new GroupMismatchException( + getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the given group id " + groupId)); + } + final CompletableFuture<RaftServerImpl> f = impls.remove(groupId); + if (f == null) { + return JavaUtils.completeExceptionally(new GroupMismatchException( + getId() + ": Group " + groupId + " not found.")); + } + return f.thenApply(impl -> { + final Collection<CommitInfoProto> commitInfos = impl.getCommitInfos(); + impl.shutdown(); + return new RaftClientReply(request, commitInfos); + }); + } + @Override public ServerInformationReply getInfo(ServerInformationRequest request) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java index a38488a..30d9d10 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java @@ -191,8 +191,8 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster> () -> client.setConfiguration(RaftPeer.emptyArray()), GroupMismatchException.class); - testFailureCase("reinitialize(..) with client group being different from the server group", - () -> client.reinitialize(anotherGroup, clusterGroup.getPeers().iterator().next().getId()), + testFailureCase("groupRemove(..) with another group id", + () -> client.groupRemove(anotherGroup.getGroupId(), clusterGroup.getPeers().iterator().next().getId()), GroupMismatchException.class); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java new file mode 100644 index 0000000..5310b94 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.log4j.Level; +import org.apache.ratis.BaseTest; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.CheckedBiConsumer; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LogUtils; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public abstract class GroupManagementBaseTest extends BaseTest { + static final Logger LOG = LoggerFactory.getLogger(GroupManagementBaseTest.class); + + { + LogUtils.setLogLevel(RaftServerProxy.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + static final RaftProperties prop = new RaftProperties(); + + public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory(); + + public MiniRaftCluster getCluster(int peerNum) throws IOException { + return getClusterFactory().newCluster(peerNum, prop); + } + + @Test + public void testMultiGroup() throws Exception { + final MiniRaftCluster cluster = getCluster(0); + LOG.info("Start testMultiGroup" + cluster.printServers()); + + // Start server with an empty conf + final RaftGroupId groupId = cluster.getGroupId(); + final RaftGroup group = new RaftGroup(groupId); + + final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0)) + .map(RaftPeerId::valueOf).collect(Collectors.toList()); + ids.forEach(id -> cluster.putNewServer(id, group, true)); + LOG.info("putNewServer: " + cluster.printServers()); + + cluster.start(); + + // Make sure that there are no leaders. + TimeUnit.SECONDS.sleep(1); + LOG.info("start: " + cluster.printServers()); + Assert.assertNull(cluster.getLeader()); + + // Add groups + final RaftGroup newGroup = new RaftGroup(RaftGroupId.randomId(), cluster.getPeers()); + LOG.info("add new group: " + newGroup); + final RaftClient client = cluster.createClient(newGroup); + for(RaftPeer p : newGroup.getPeers()) { + client.groupAdd(newGroup, p.getId()); + } + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + cluster.shutdown(); + } + + @Test + public void testMultiGroup5Nodes() throws Exception { + final int[] idIndex = {3, 4, 5}; + runMultiGroupTest(idIndex, 0); + } + + @Test + public void testMultiGroup7Nodes() throws Exception { + final int[] idIndex = {1, 6, 7}; + runMultiGroupTest(idIndex, 1); + } + + @Test + public void testMultiGroup9Nodes() throws Exception { + final int[] idIndex = {5, 8, 9}; + runMultiGroupTest(idIndex, 2); + } + + private void runMultiGroupTest(int[] idIndex, int chosen) throws Exception { + printThreadCount(null, "init"); + runMultiGroupTest(getCluster(0), idIndex, chosen, NOOP); + } + + static final CheckedBiConsumer<MiniRaftCluster, RaftGroup, RuntimeException> NOOP = (c, g) -> {}; + + public static <T extends Throwable> void runMultiGroupTest( + MiniRaftCluster cluster, int[] idIndex, int chosen, + CheckedBiConsumer<MiniRaftCluster, RaftGroup, T> checker) + throws IOException, InterruptedException, T { + if (chosen < 0) { + chosen = ThreadLocalRandom.current().nextInt(idIndex.length); + } + final String type = cluster.getClass().getSimpleName() + + Arrays.toString(idIndex) + "chosen=" + chosen; + LOG.info("\n\nrunMultiGroupTest with " + type + ": " + cluster.printServers()); + + // Start server with an empty conf + final RaftGroup emptyGroup = new RaftGroup(cluster.getGroupId()); + + final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0)) + .map(RaftPeerId::valueOf).collect(Collectors.toList()); + LOG.info("ids: " + ids); + ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true)); + LOG.info("putNewServer: " + cluster.printServers()); + + TimeUnit.SECONDS.sleep(1); + cluster.start(); + + // Make sure that there are no leaders. + TimeUnit.SECONDS.sleep(1); + LOG.info("start: " + cluster.printServers()); + Assert.assertNull(cluster.getLeader()); + + // Reinitialize servers to three groups + final List<RaftPeer> allPeers = cluster.getPeers(); + Collections.sort(allPeers, Comparator.comparing(p -> p.getId().toString())); + final RaftGroup[] groups = new RaftGroup[idIndex.length]; + for (int i = 0; i < idIndex.length; i++) { + final RaftGroupId gid = RaftGroupId.randomId(); + final int previous = i == 0 ? 0 : idIndex[i - 1]; + final RaftPeer[] peers = allPeers.subList(previous, idIndex[i]).toArray(RaftPeer.emptyArray()); + groups[i] = new RaftGroup(gid, peers); + + LOG.info(i + ") starting " + groups[i]); + for(RaftPeer p : peers) { + try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) { + client.groupAdd(groups[i], p.getId()); + } + } + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid)); + checker.accept(cluster, groups[i]); + } + printThreadCount(type, "start groups"); + LOG.info("start groups: " + cluster.printServers()); + + // randomly remove two of the groups + LOG.info("chosen = " + chosen + ", " + groups[chosen]); + + for (int i = 0; i < groups.length; i++) { + if (i != chosen) { + final RaftGroup g = groups[i]; + LOG.info(i + ") close " + cluster.printServers(g.getGroupId())); + for(RaftPeer p : g.getPeers()) { + try (final RaftClient client = cluster.createClient(p.getId(), g)) { + client.groupRemove(g.getGroupId(), p.getId()); + } + } + } + } + printThreadCount(type, "close groups"); + LOG.info("close groups: " + cluster.printServers()); + + // update chosen group to use all the peers + final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId()); + for(int i = 0; i < groups.length; i++) { + if (i != chosen) { + LOG.info(i + ") groupAdd: " + cluster.printServers(groups[i].getGroupId())); + for (RaftPeer p : groups[i].getPeers()) { + try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) { + client.groupAdd(newGroup, p.getId()); + } + } + } + } + LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId())); + try (final RaftClient client = cluster.createClient(groups[chosen])) { + client.setConfiguration(allPeers.toArray(RaftPeer.emptyArray())); + } + + Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); + checker.accept(cluster, groups[chosen]); + LOG.info("update groups: " + cluster.printServers()); + printThreadCount(type, "update groups"); + + cluster.shutdown(); + printThreadCount(type, "shutdown"); + } + + static void printThreadCount(String type, String label) { + System.out.println("| " + type + " | " + label + " | " + + JavaUtils.getRootThreadGroup().activeCount() + " |"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java deleted file mode 100644 index f6e417c..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.impl; - -import org.apache.log4j.Level; -import org.apache.ratis.BaseTest; -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.util.CheckedBiConsumer; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.LogUtils; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -public abstract class ReinitializationBaseTest extends BaseTest { - static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class); - - { - LogUtils.setLogLevel(RaftServerProxy.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); - } - - static final RaftProperties prop = new RaftProperties(); - - public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory(); - - public MiniRaftCluster getCluster(int peerNum) throws IOException { - return getClusterFactory().newCluster(peerNum, prop); - } - - @Test - public void testReinitialize() throws Exception { - final MiniRaftCluster cluster = getCluster(0); - LOG.info("Start testReinitialize" + cluster.printServers()); - - // Start server with an empty conf - final RaftGroupId groupId = cluster.getGroupId(); - final RaftGroup group = new RaftGroup(groupId); - - final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0)) - .map(RaftPeerId::valueOf).collect(Collectors.toList()); - ids.forEach(id -> cluster.putNewServer(id, group, true)); - LOG.info("putNewServer: " + cluster.printServers()); - - cluster.start(); - - // Make sure that there are no leaders. - TimeUnit.SECONDS.sleep(1); - LOG.info("start: " + cluster.printServers()); - Assert.assertNull(cluster.getLeader()); - - // Reinitialize servers - final RaftGroup newGroup = new RaftGroup(groupId, cluster.getPeers()); - final RaftClient client = cluster.createClient(newGroup); - for(RaftPeer p : newGroup.getPeers()) { - client.reinitialize(newGroup, p.getId()); - } - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); - cluster.shutdown(); - } - - @Test - public void testReinitialize5Nodes() throws Exception { - final int[] idIndex = {3, 4, 5}; - runTestReinitializeMultiGroups(idIndex, 0); - } - - @Test - public void testReinitialize7Nodes() throws Exception { - final int[] idIndex = {1, 6, 7}; - runTestReinitializeMultiGroups(idIndex, 1); - } - - @Test - public void testReinitialize9Nodes() throws Exception { - final int[] idIndex = {5, 8, 9}; - runTestReinitializeMultiGroups(idIndex, 2); - } - - private void runTestReinitializeMultiGroups(int[] idIndex, int chosen) throws Exception { - printThreadCount(null, "init"); - runTestReinitializeMultiGroups(getCluster(0), idIndex, chosen, NOOP); - } - - static final CheckedBiConsumer<MiniRaftCluster, RaftGroup, RuntimeException> NOOP = (c, g) -> {}; - - public static <T extends Throwable> void runTestReinitializeMultiGroups( - MiniRaftCluster cluster, int[] idIndex, int chosen, - CheckedBiConsumer<MiniRaftCluster, RaftGroup, T> checker) - throws IOException, InterruptedException, T { - if (chosen < 0) { - chosen = ThreadLocalRandom.current().nextInt(idIndex.length); - } - final String type = cluster.getClass().getSimpleName() - + Arrays.toString(idIndex) + "chosen=" + chosen; - LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers()); - - // Start server with an empty conf - final RaftGroup emptyGroup = new RaftGroup(cluster.getGroupId()); - - final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0)) - .map(RaftPeerId::valueOf).collect(Collectors.toList()); - LOG.info("ids: " + ids); - ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true)); - LOG.info("putNewServer: " + cluster.printServers()); - - TimeUnit.SECONDS.sleep(1); - cluster.start(); - - // Make sure that there are no leaders. - TimeUnit.SECONDS.sleep(1); - LOG.info("start: " + cluster.printServers()); - Assert.assertNull(cluster.getLeader()); - - // Reinitialize servers to three groups - final List<RaftPeer> allPeers = cluster.getPeers(); - Collections.sort(allPeers, Comparator.comparing(p -> p.getId().toString())); - final RaftGroup[] groups = new RaftGroup[idIndex.length]; - for (int i = 0; i < idIndex.length; i++) { - final RaftGroupId gid = RaftGroupId.randomId(); - final int previous = i == 0 ? 0 : idIndex[i - 1]; - final RaftPeer[] peers = allPeers.subList(previous, idIndex[i]).toArray(RaftPeer.emptyArray()); - groups[i] = new RaftGroup(gid, peers); - - LOG.info(i + ") starting " + groups[i]); - for(RaftPeer p : peers) { - try(final RaftClient client = cluster.createClient(p.getId(), emptyGroup)) { - client.reinitialize(groups[i], p.getId()); - } - } - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true, gid)); - checker.accept(cluster, groups[i]); - } - printThreadCount(type, "start groups"); - LOG.info("start groups: " + cluster.printServers()); - - // randomly close two of the groups (i.e. reinitialize to empty peers) - LOG.info("chosen = " + chosen + ", " + groups[chosen]); - - for (int i = 0; i < groups.length; i++) { - if (i != chosen) { - final RaftGroup g = groups[i]; - final RaftGroup newGroup = new RaftGroup(g.getGroupId()); - LOG.info(i + ") close " + cluster.printServers(g.getGroupId())); - for(RaftPeer p : g.getPeers()) { - try (final RaftClient client = cluster.createClient(p.getId(), g)) { - client.reinitialize(newGroup, p.getId()); - } - } - } - } - printThreadCount(type, "close groups"); - LOG.info("close groups: " + cluster.printServers()); - - // update chosen group to use all the peers - final RaftGroup newGroup = new RaftGroup(groups[chosen].getGroupId()); - for(int i = 0; i < groups.length; i++) { - if (i != chosen) { - LOG.info(i + ") reinitialize: " + cluster.printServers(groups[i].getGroupId())); - for (RaftPeer p : groups[i].getPeers()) { - try (final RaftClient client = cluster.createClient(p.getId(), groups[i])) { - client.reinitialize(newGroup, p.getId()); - } - } - } - } - LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId())); - try (final RaftClient client = cluster.createClient(groups[chosen])) { - client.setConfiguration(allPeers.toArray(RaftPeer.emptyArray())); - } - - Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true)); - checker.accept(cluster, groups[chosen]); - LOG.info("update groups: " + cluster.printServers()); - printThreadCount(type, "update groups"); - - cluster.shutdown(); - printThreadCount(type, "shutdown"); - } - - static void printThreadCount(String type, String label) { - System.out.println("| " + type + " | " + label + " | " - + JavaUtils.getRootThreadGroup().activeCount() + " |"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index c4a9a5e..d9bbc43 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -155,9 +155,9 @@ class SimulatedServerRpc implements RaftServerRpc { public RaftClientReply handleRequest(RaftClientRequest request) throws IOException { final CompletableFuture<RaftClientReply> future; - if (request instanceof ReinitializeRequest) { + if (request instanceof GroupManagementRequest) { future = CompletableFuture.completedFuture( - server.reinitialize((ReinitializeRequest) request)); + server.groupManagement((GroupManagementRequest) request)); } else if (request instanceof ServerInformationRequest) { future = CompletableFuture.completedFuture( server.getInfo((ServerInformationRequest) request)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java new file mode 100644 index 0000000..73fbae9 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestGroupManagementWithSimulatedRpc.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.simulation; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.GroupManagementBaseTest; + +public class TestGroupManagementWithSimulatedRpc extends GroupManagementBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithSimulatedRpc.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/09b099c7/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java deleted file mode 100644 index 7fc0c6c..0000000 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.server.simulation; - -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.server.impl.ReinitializationBaseTest; - -public class TestReinitializationWithSimulatedRpc extends ReinitializationBaseTest { - @Override - public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { - return MiniRaftClusterWithSimulatedRpc.FACTORY; - } -}
