This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 1effe6fbc15a42a6bbdca9c94155d54f5b0e3023 Author: Yaolong Liu <[email protected]> AuthorDate: Tue Jul 5 04:17:42 2022 +0800 RATIS-1594. Support ADD mode to SetConfiguration (#658) --- .../java/org/apache/ratis/client/api/AdminApi.java | 22 +++- .../org/apache/ratis/client/impl/AdminImpl.java | 7 +- .../apache/ratis/client/impl/ClientProtoUtils.java | 27 ++++- .../ratis/protocol/SetConfigurationRequest.java | 111 +++++++++++++++++---- ratis-proto/src/main/proto/Raft.proto | 5 + .../apache/ratis/server/impl/LeaderStateImpl.java | 5 +- .../ratis/server/impl/RaftConfigurationImpl.java | 20 +++- .../apache/ratis/server/impl/RaftServerImpl.java | 29 +++++- .../apache/ratis/server/impl/MiniRaftCluster.java | 2 +- .../server/impl/RaftReconfigurationBaseTest.java | 28 ++++++ .../ratis/server/impl/RaftServerTestUtil.java | 3 +- 11 files changed, 217 insertions(+), 42 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java index 706c917b..86e25ef9 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java @@ -20,6 +20,7 @@ package org.apache.ratis.client.api; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.SetConfigurationRequest; import java.io.IOException; import java.util.Arrays; @@ -31,6 +32,9 @@ import java.util.List; * such as setting raft configuration and transferring leadership. */ public interface AdminApi { + RaftClientReply setConfiguration(SetConfigurationRequest.Arguments arguments) + throws IOException; + /** The same as setConfiguration(serversInNewConf, Collections.emptyList()). */ default RaftClientReply setConfiguration(List<RaftPeer> serversInNewConf) throws IOException { return setConfiguration(serversInNewConf, Collections.emptyList()); @@ -38,17 +42,27 @@ public interface AdminApi { /** The same as setConfiguration(Arrays.asList(serversInNewConf)). */ default RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException { - return setConfiguration(Arrays.asList(serversInNewConf)); + return setConfiguration(Arrays.asList(serversInNewConf), Collections.emptyList()); } /** Set the configuration request to the raft service. */ - RaftClientReply setConfiguration(List<RaftPeer> serversInNewConf, List<RaftPeer> listenersInNewConf) - throws IOException; + default RaftClientReply setConfiguration(List<RaftPeer> serversInNewConf, List<RaftPeer> listenersInNewConf) + throws IOException { + return setConfiguration(SetConfigurationRequest.Arguments + .newBuilder() + .setServersInNewConf(serversInNewConf) + .setListenersInNewConf(listenersInNewConf) + .build()); + } /** The same as setConfiguration(Arrays.asList(serversInNewConf), Arrays.asList(listenersInNewConf)). */ default RaftClientReply setConfiguration(RaftPeer[] serversInNewConf, RaftPeer[] listenersInNewConf) throws IOException { - return setConfiguration(Arrays.asList(serversInNewConf), Arrays.asList(listenersInNewConf)); + return setConfiguration(SetConfigurationRequest.Arguments + .newBuilder() + .setListenersInNewConf(serversInNewConf) + .setListenersInNewConf(listenersInNewConf) + .build()); } /** Transfer leadership to the given server.*/ diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java index 1bb5cee8..44551543 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java @@ -36,16 +36,15 @@ class AdminImpl implements AdminApi { this.client = Objects.requireNonNull(client, "client == null"); } - @Override - public RaftClientReply setConfiguration(List<RaftPeer> peersInNewConf, List<RaftPeer> listenersInNewConf) - throws IOException { + public RaftClientReply setConfiguration(SetConfigurationRequest.Arguments arguments) throws IOException { + List<RaftPeer> peersInNewConf = arguments.getServersInNewConf(); Objects.requireNonNull(peersInNewConf, "peersInNewConf == null"); final long callId = CallId.getAndIncrement(); // also refresh the rpc proxies for these peers client.getClientRpc().addRaftPeers(peersInNewConf); return client.io().sendRequestWithRetry(() -> new SetConfigurationRequest( - client.getId(), client.getLeaderId(), client.getGroupId(), callId, peersInNewConf, listenersInNewConf)); + client.getId(), client.getLeaderId(), client.getGroupId(), callId, arguments)); } @Override diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 3f8fa459..fd9f49f5 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -499,22 +499,39 @@ public interface ClientProtoUtils { static SetConfigurationRequest toSetConfigurationRequest( SetConfigurationRequestProto p) { + final SetConfigurationRequest.Arguments arguments = SetConfigurationRequest.Arguments.newBuilder() + .setServersInNewConf(ProtoUtils.toRaftPeers(p.getPeersList())) + .setListenersInNewConf(ProtoUtils.toRaftPeers(p.getListenersList())) + .setMode(toSetConfigurationMode(p.getMode())) + .build(); final RaftRpcRequestProto m = p.getRpcRequest(); - final List<RaftPeer> peers = ProtoUtils.toRaftPeers(p.getPeersList()); - final List<RaftPeer> listeners = ProtoUtils.toRaftPeers(p.getListenersList()); return new SetConfigurationRequest( ClientId.valueOf(m.getRequestorId()), RaftPeerId.valueOf(m.getReplyId()), ProtoUtils.toRaftGroupId(m.getRaftGroupId()), - p.getRpcRequest().getCallId(), peers, listeners); + p.getRpcRequest().getCallId(), arguments); + } + + static SetConfigurationRequest.Mode toSetConfigurationMode( + SetConfigurationRequestProto.Mode p) { + switch (p) { + case SET_UNCONDITIONALLY: + return SetConfigurationRequest.Mode.SET_UNCONDITIONALLY; + case ADD: + return SetConfigurationRequest.Mode.ADD; + default: + throw new IllegalArgumentException("Unexpected mode " + p); + } } static SetConfigurationRequestProto toSetConfigurationRequestProto( SetConfigurationRequest request) { + final SetConfigurationRequest.Arguments arguments = request.getArguments(); return SetConfigurationRequestProto.newBuilder() .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) - .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInNewConf())) - .addAllListeners(ProtoUtils.toRaftPeerProtos(request.getListenersInNewConf())) + .addAllPeers(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER))) + .addAllListeners(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.LISTENER))) + .setMode(SetConfigurationRequestProto.Mode.valueOf(arguments.getMode().name())) .build(); } diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java index da551f2d..ef3cee92 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java @@ -17,39 +17,116 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.util.Preconditions; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; public class SetConfigurationRequest extends RaftClientRequest { - private final List<RaftPeer> peers; - private final List<RaftPeer> listeners; - public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId, - RaftGroupId groupId, long callId, List<RaftPeer> peers) { - this(clientId, serverId, groupId, callId, peers, Collections.emptyList()); + public enum Mode { + SET_UNCONDITIONALLY, + ADD } - public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId, - RaftGroupId groupId, long callId, List<RaftPeer> peers, List<RaftPeer> listeners) { - super(clientId, serverId, groupId, callId, true, writeRequestType()); - this.peers = peers != null? Collections.unmodifiableList(peers): Collections.emptyList(); - this.listeners = listeners != null? Collections.unmodifiableList(listeners) : Collections.emptyList(); - Preconditions.assertUnique(this.peers); - Preconditions.assertUnique(this.listeners); + public static final class Arguments { + private final List<RaftPeer> serversInNewConf; + private final List<RaftPeer> listenersInNewConf; + private final Mode mode; + + private Arguments(List<RaftPeer> serversInNewConf, List<RaftPeer> listenersInNewConf,Mode mode) { + this.serversInNewConf = Optional.ofNullable(serversInNewConf) + .map(Collections::unmodifiableList) + .orElseGet(Collections::emptyList); + this.listenersInNewConf = Optional.ofNullable(listenersInNewConf) + .map(Collections::unmodifiableList) + .orElseGet(Collections::emptyList); + this.mode = mode; + + Preconditions.assertUnique(serversInNewConf); + Preconditions.assertUnique(listenersInNewConf); + } + + public List<RaftPeer> getPeersInNewConf(RaftProtos.RaftPeerRole role) { + switch (role) { + case FOLLOWER: return serversInNewConf; + case LISTENER: return listenersInNewConf; + default: + throw new IllegalArgumentException("Unexpected role " + role); + } + } + + public List<RaftPeer> getServersInNewConf() { + return serversInNewConf; + } + + public Mode getMode() { + return mode; + } + @Override + public String toString() { + return getMode() + + ", servers:" + getPeersInNewConf(RaftProtos.RaftPeerRole.FOLLOWER) + + ", listeners:" + getPeersInNewConf(RaftProtos.RaftPeerRole.LISTENER); + + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private List<RaftPeer> serversInNewConf; + private List<RaftPeer> listenersInNewConf = Collections.emptyList(); + private Mode mode = Mode.SET_UNCONDITIONALLY; + + public Builder setServersInNewConf(List<RaftPeer> serversInNewConf) { + this.serversInNewConf = serversInNewConf; + return this; + } + + public Builder setListenersInNewConf(List<RaftPeer> listenersInNewConf) { + this.listenersInNewConf = listenersInNewConf; + return this; + } + + public Builder setServersInNewConf(RaftPeer[] serversInNewConfArray) { + this.serversInNewConf = Arrays.asList(serversInNewConfArray); + return this; + } + + public Builder setListenersInNewConf(RaftPeer[] listenersInNewConfArray) { + this.listenersInNewConf = Arrays.asList(listenersInNewConfArray); + return this; + } + + public Builder setMode(Mode mode) { + this.mode = mode; + return this; + } + + public Arguments build() { + return new Arguments(serversInNewConf, listenersInNewConf, mode); + } + } } + private final Arguments arguments; - public List<RaftPeer> getPeersInNewConf() { - return peers; + public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId, + RaftGroupId groupId, long callId, Arguments arguments) { + super(clientId, serverId, groupId, callId, true, writeRequestType()); + this.arguments = arguments; } - public List<RaftPeer> getListenersInNewConf() { - return listeners; + public Arguments getArguments() { + return arguments; } @Override public String toString() { - return super.toString() + ", peers:" + getPeersInNewConf(); + return super.toString() + ", " + getArguments(); } } diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index df2c3d88..f6b38c14 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -406,9 +406,14 @@ message RaftClientReplyProto { // setConfiguration request message SetConfigurationRequestProto { + enum Mode { + SET_UNCONDITIONALLY = 0; + ADD = 1; + } RaftRpcRequestProto rpcRequest = 1; repeated RaftPeerProto peers = 2; repeated RaftPeerProto listeners = 3; + optional Mode mode = 4; } // transfer leadership request diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 7265f3a8..648a0eb3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -364,12 +364,11 @@ class LeaderStateImpl implements LeaderState { /** * Start bootstrapping new peers */ - PendingRequest startSetConfiguration(SetConfigurationRequest request) { + PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftPeer> peersInNewConf) { LOG.info("{}: startSetConfiguration {}", this, request); Preconditions.assertTrue(running && !inStagingState()); - final List<RaftPeer> peersInNewConf = request.getPeersInNewConf(); - final List<RaftPeer> listenersInNewConf = request.getListenersInNewConf(); + final List<RaftPeer> listenersInNewConf = request.getArguments().getPeersInNewConf(RaftPeerRole.LISTENER); final Collection<RaftPeer> peersToBootStrap = server.getRaftConf().filterNotContainedInConf(peersInNewConf); final Collection<RaftPeer> listenersToBootStrap= server.getRaftConf().filterNotContainedInConf(listenersInNewConf); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java index ed661a99..9f203b23 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java @@ -242,12 +242,26 @@ final class RaftConfigurationImpl implements RaftConfiguration { return logEntryIndex + ": " + conf + ", old=" + oldConf; } - boolean hasNoChange(Collection<RaftPeer> newMembers) { - if (!isStable() || conf.size() != newMembers.size()) { + boolean hasNoChange(Collection<RaftPeer> newMembers, Collection<RaftPeer> newListeners) { + if (!isStable() || conf.size() != newMembers.size() + || conf.getPeers(RaftPeerRole.LISTENER).size() != newListeners.size()) { return false; } for (RaftPeer peer : newMembers) { - if (!conf.contains(peer.getId()) || conf.getPeer(peer.getId()).getPriority() != peer.getPriority()) { + final RaftPeer inConf = conf.getPeer(peer.getId()); + if (inConf == null) { + return false; + } + if (inConf.getPriority() != peer.getPriority()) { + return false; + } + } + for (RaftPeer peer : newListeners) { + final RaftPeer inConf = conf.getPeer(peer.getId(), RaftPeerRole.LISTENER); + if (inConf == null) { + return false; + } + if (inConf.getPriority() != peer.getPriority()) { return false; } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index fe54dd08..5b7be2e2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; @@ -1086,7 +1087,7 @@ class RaftServerImpl implements RaftServer.Division, return reply; } - final List<RaftPeer> peersInNewConf = request.getPeersInNewConf(); + final SetConfigurationRequest.Arguments arguments = request.getArguments(); final PendingRequest pending; synchronized (this) { reply = checkLeaderState(request, null, false); @@ -1102,20 +1103,40 @@ class RaftServerImpl implements RaftServer.Division, "Reconfiguration is already in progress: " + current); } + final List<RaftPeer> serversInNewConf; + final List<RaftPeer> listenersInNewConf; + if (arguments.getMode() == SetConfigurationRequest.Mode.ADD) { + serversInNewConf = add(RaftPeerRole.FOLLOWER, current, arguments); + listenersInNewConf = add(RaftPeerRole.LISTENER, current, arguments); + } else { + serversInNewConf = arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER); + listenersInNewConf = arguments.getPeersInNewConf(RaftPeerRole.LISTENER); + } + // return success with a null message if the new conf is the same as the current - if (current.hasNoChange(peersInNewConf)) { + if (current.hasNoChange(serversInNewConf, listenersInNewConf)) { pending = new PendingRequest(request); pending.setReply(newSuccessReply(request)); return pending.getFuture(); } - getRaftServer().addRaftPeers(peersInNewConf); + getRaftServer().addRaftPeers(serversInNewConf); // add staging state into the leaderState - pending = leaderState.startSetConfiguration(request); + pending = leaderState.startSetConfiguration(request, serversInNewConf); } return pending.getFuture(); } + static List<RaftPeer> add(RaftPeerRole role, RaftConfigurationImpl conf, SetConfigurationRequest.Arguments args) { + final Map<RaftPeerId, RaftPeer> inConfs = conf.getAllPeers(role).stream() + .collect(Collectors.toMap(RaftPeer::getId, Function.identity())); + + final List<RaftPeer> toAdds = args.getPeersInNewConf(role); + toAdds.stream().map(RaftPeer::getId).forEach(inConfs::remove); + + return Stream.concat(toAdds.stream(), inConfs.values().stream()).collect(Collectors.toList()); + } + /** * check if the remote peer is not included in the current conf * and should shutdown. should shutdown if all the following stands: diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 6ac14d73..61db4936 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -740,7 +740,7 @@ public abstract class MiniRaftCluster implements Closeable { ClientId clientId, RaftPeerId leaderId, RaftPeer... peers) { return new SetConfigurationRequest(clientId, leaderId, getGroupId(), CallId.getDefault(), - Arrays.asList(peers), Collections.emptyList()); + SetConfigurationRequest.Arguments.newBuilder().setServersInNewConf(peers).build()); } public void setConfiguration(RaftPeer... peers) throws IOException { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index f6580e84..d764988e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -190,6 +190,34 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste waitAndCheckNewConf(cluster, allPeers, 2, null); } + @Test + public void testSetConfigurationInAddMode() throws Exception { + runWithNewCluster(2, this::runTestSetConfigurationInAddMode); + } + + private void runTestSetConfigurationInAddMode(CLUSTER cluster) throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + + PeerChanges change = cluster.addNewPeers(1, true); + List<RaftPeer> peers = Arrays.asList(change.newPeers); + + try (final RaftClient client = cluster.createClient(leader.getId())) { + for (int i = 0; i < 10; i++) { + RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + RaftClientReply reply = client.admin().setConfiguration( + SetConfigurationRequest.Arguments.newBuilder() + .setServersInNewConf(peers) + .setMode(SetConfigurationRequest.Mode.ADD).build()); + Assert.assertTrue(reply.isSuccess()); + waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); + } + cluster.close(); + } + + + @Test(timeout = 30000) public void testReconfTwice() throws Exception { runWithNewCluster(3, this::runTestReconfTwice); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index f3cc38c9..e6e60eb1 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; @@ -99,7 +100,7 @@ public class RaftServerTestUtil { if (current.containsInConf(server.getId())) { numIncluded++; Assert.assertTrue(conf.isStable()); - Assert.assertTrue(conf.hasNoChange(peers)); + Assert.assertTrue(conf.hasNoChange(peers, Collections.emptyList())); } else if (server.getInfo().isAlive()) { // The server is successfully removed from the conf // It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf).
