This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 5437186dc684f00b10b8c007528fe2233e787660 Author: Yaolong Liu <[email protected]> AuthorDate: Tue Jul 19 09:56:21 2022 +0800 RATIS-1593. Support CAS mode to setConfiguration (#682) (cherry picked from commit 6d2580f69fdefa87e23f633e0f3a2c8fbc6a1d68) --- .../org/apache/ratis/client/impl/BlockingImpl.java | 4 ++- .../apache/ratis/client/impl/ClientProtoUtils.java | 6 ++++ .../ratis/protocol/SetConfigurationRequest.java | 39 ++++++++++++++++++++-- .../exceptions/SetConfigurationException.java | 29 ++++++++++++++++ .../org/apache/ratis/util/CollectionUtils.java | 18 ++++++++++ ratis-proto/src/main/proto/Raft.proto | 3 ++ .../ratis/server/impl/RaftConfigurationImpl.java | 4 +-- .../apache/ratis/server/impl/RaftServerImpl.java | 14 ++++++++ .../server/impl/RaftReconfigurationBaseTest.java | 39 ++++++++++++++++++++++ 9 files changed, 150 insertions(+), 6 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java index e2347140..49bea573 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java @@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.apache.ratis.protocol.exceptions.AlreadyExistsException; +import org.apache.ratis.protocol.exceptions.SetConfigurationException; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; import org.apache.ratis.protocol.exceptions.StateMachineException; @@ -101,7 +102,8 @@ class BlockingImpl implements BlockingApi { return client.handleReply(request, reply); } } catch (GroupMismatchException | StateMachineException | TransferLeadershipException | - LeaderSteppingDownException | AlreadyClosedException | AlreadyExistsException e) { + LeaderSteppingDownException | AlreadyClosedException | AlreadyExistsException | + SetConfigurationException e) { throw e; } catch (IOException e) { ioe = e; diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index fd9f49f5..859e1d4f 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -502,6 +502,8 @@ public interface ClientProtoUtils { final SetConfigurationRequest.Arguments arguments = SetConfigurationRequest.Arguments.newBuilder() .setServersInNewConf(ProtoUtils.toRaftPeers(p.getPeersList())) .setListenersInNewConf(ProtoUtils.toRaftPeers(p.getListenersList())) + .setServersInCurrentConf(ProtoUtils.toRaftPeers(p.getCurrentPeersList())) + .setListenersInCurrentConf(ProtoUtils.toRaftPeers(p.getCurrentListenersList())) .setMode(toSetConfigurationMode(p.getMode())) .build(); final RaftRpcRequestProto m = p.getRpcRequest(); @@ -519,6 +521,8 @@ public interface ClientProtoUtils { return SetConfigurationRequest.Mode.SET_UNCONDITIONALLY; case ADD: return SetConfigurationRequest.Mode.ADD; + case COMPARE_AND_SET: + return SetConfigurationRequest.Mode.COMPARE_AND_SET; default: throw new IllegalArgumentException("Unexpected mode " + p); } @@ -531,6 +535,8 @@ public interface ClientProtoUtils { .setRpcRequest(toRaftRpcRequestProtoBuilder(request)) .addAllPeers(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER))) .addAllListeners(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.LISTENER))) + .addAllCurrentPeers(ProtoUtils.toRaftPeerProtos(arguments.getServersInCurrentConf())) + .addAllCurrentListeners(ProtoUtils.toRaftPeerProtos(arguments.getListenersInCurrentConf())) .setMode(SetConfigurationRequestProto.Mode.valueOf(arguments.getMode().name())) .build(); } diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java index ef3cee92..e5e8236b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java @@ -29,25 +29,37 @@ public class SetConfigurationRequest extends RaftClientRequest { public enum Mode { SET_UNCONDITIONALLY, - ADD + ADD, + COMPARE_AND_SET } public static final class Arguments { private final List<RaftPeer> serversInNewConf; private final List<RaftPeer> listenersInNewConf; + private final List<RaftPeer> serversInCurrentConf; + private final List<RaftPeer> listenersInCurrentConf; private final Mode mode; - private Arguments(List<RaftPeer> serversInNewConf, List<RaftPeer> listenersInNewConf,Mode mode) { + private Arguments(List<RaftPeer> serversInNewConf, List<RaftPeer> listenersInNewConf, Mode mode, + List<RaftPeer> serversInCurrentConf, List<RaftPeer> listenersInCurrentConf) { this.serversInNewConf = Optional.ofNullable(serversInNewConf) .map(Collections::unmodifiableList) .orElseGet(Collections::emptyList); this.listenersInNewConf = Optional.ofNullable(listenersInNewConf) .map(Collections::unmodifiableList) .orElseGet(Collections::emptyList); + this.serversInCurrentConf = Optional.ofNullable(serversInCurrentConf) + .map(Collections::unmodifiableList) + .orElseGet(Collections::emptyList); + this.listenersInCurrentConf = Optional.ofNullable(listenersInCurrentConf) + .map(Collections::unmodifiableList) + .orElseGet(Collections::emptyList); this.mode = mode; Preconditions.assertUnique(serversInNewConf); Preconditions.assertUnique(listenersInNewConf); + Preconditions.assertUnique(serversInCurrentConf); + Preconditions.assertUnique(listenersInCurrentConf); } public List<RaftPeer> getPeersInNewConf(RaftProtos.RaftPeerRole role) { @@ -59,6 +71,14 @@ public class SetConfigurationRequest extends RaftClientRequest { } } + public List<RaftPeer> getListenersInCurrentConf() { + return listenersInCurrentConf; + } + + public List<RaftPeer> getServersInCurrentConf() { + return serversInCurrentConf; + } + public List<RaftPeer> getServersInNewConf() { return serversInNewConf; } @@ -81,6 +101,8 @@ public class SetConfigurationRequest extends RaftClientRequest { public static class Builder { private List<RaftPeer> serversInNewConf; private List<RaftPeer> listenersInNewConf = Collections.emptyList(); + private List<RaftPeer> serversInCurrentConf = Collections.emptyList(); + private List<RaftPeer> listenersInCurrentConf = Collections.emptyList(); private Mode mode = Mode.SET_UNCONDITIONALLY; public Builder setServersInNewConf(List<RaftPeer> serversInNewConf) { @@ -103,13 +125,24 @@ public class SetConfigurationRequest extends RaftClientRequest { return this; } + public Builder setServersInCurrentConf(List<RaftPeer> serversInCurrentConf) { + this.serversInCurrentConf = serversInCurrentConf; + return this; + } + + public Builder setListenersInCurrentConf(List<RaftPeer> listenersInCurrentConf) { + this.listenersInCurrentConf = listenersInCurrentConf; + return this; + } + public Builder setMode(Mode mode) { this.mode = mode; return this; } public Arguments build() { - return new Arguments(serversInNewConf, listenersInNewConf, mode); + return new Arguments(serversInNewConf, listenersInNewConf, mode, serversInCurrentConf, + listenersInCurrentConf); } } } diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java new file mode 100644 index 00000000..c7593b80 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol.exceptions; + +public class SetConfigurationException extends RaftException { + + public SetConfigurationException(String message) { + super(message); + } + + public SetConfigurationException(String message, Throwable t) { + super(message, t); + } +} diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java index 39f932ca..cdfd9635 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java @@ -140,4 +140,22 @@ public interface CollectionUtils { return computeIfAbsent(map, key, supplier, () -> { }); } + + static <V> boolean equalsIgnoreOrder(List<V> left, List<V> right, Comparator<V> comparator) { + if (left == right) { + return true; + } else if (left == null || right == null) { + // only one of them is null (cannot be both null since they are unequal) + return false; + } + final int n = right.size(); + if (left.size() != n) { + return false; + } + left = new ArrayList<>(left); + left.sort(comparator); + right = new ArrayList<>(right); + right.sort(comparator); + return left.equals(right); + } } diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index fa3d15e2..c571f0c7 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -410,11 +410,14 @@ message SetConfigurationRequestProto { enum Mode { SET_UNCONDITIONALLY = 0; ADD = 1; + COMPARE_AND_SET = 2; } RaftRpcRequestProto rpcRequest = 1; repeated RaftPeerProto peers = 2; repeated RaftPeerProto listeners = 3; optional Mode mode = 4; + repeated RaftPeerProto currentPeers = 5; + repeated RaftPeerProto currentListeners = 6; } // transfer leadership request diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java index 9f203b23..ee01f275 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java @@ -200,8 +200,8 @@ final class RaftConfigurationImpl implements RaftConfiguration { } @Override - public Collection<RaftPeer> getAllPeers(RaftPeerRole role) { - final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers(role)); + public List<RaftPeer> getAllPeers(RaftPeerRole role) { + final List<RaftPeer> peers = new ArrayList<>(conf.getPeers(role)); if (oldConf != null) { oldConf.getPeers(role).stream() .filter(p -> !peers.contains(p)) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 9268b99a..d4711fbd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase; import org.apache.ratis.protocol.*; +import org.apache.ratis.protocol.exceptions.SetConfigurationException; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; @@ -1118,6 +1119,19 @@ class RaftServerImpl implements RaftServer.Division, if (arguments.getMode() == SetConfigurationRequest.Mode.ADD) { serversInNewConf = add(RaftPeerRole.FOLLOWER, current, arguments); listenersInNewConf = add(RaftPeerRole.LISTENER, current, arguments); + } else if (arguments.getMode() == SetConfigurationRequest.Mode.COMPARE_AND_SET) { + final Comparator<RaftPeer> comparator = Comparator.comparing(RaftPeer::getId, + Comparator.comparing(RaftPeerId::toString)); + if (CollectionUtils.equalsIgnoreOrder(arguments.getServersInCurrentConf(), + current.getAllPeers(RaftPeerRole.FOLLOWER), comparator) + && CollectionUtils.equalsIgnoreOrder(arguments.getListenersInCurrentConf(), + current.getAllPeers(RaftPeerRole.LISTENER), comparator)) { + serversInNewConf = arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER); + listenersInNewConf = arguments.getPeersInNewConf(RaftPeerRole.LISTENER); + } else { + throw new SetConfigurationException("Failed to set configuration: current configuration " + + current + " is different than the request " + request); + } } else { serversInNewConf = arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER); listenersInNewConf = arguments.getPeersInNewConf(RaftPeerRole.LISTENER); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index d9b4e93a..eac63d0b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.exceptions.SetConfigurationException; import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException; import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException; @@ -216,6 +217,44 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste cluster.close(); } + @Test + public void testSetConfigurationInCasMode() throws Exception { + runWithNewCluster(2, this::runTestSetConfigurationInCasMode); + } + + private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws Exception { + final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); + List<RaftPeer> oldPeers = cluster.getPeers(); + + PeerChanges change = cluster.addNewPeers(1, true); + List<RaftPeer> peers = Arrays.asList(change.allPeersInNewConf); + + try (final RaftClient client = cluster.createClient(leader.getId())) { + for (int i = 0; i < 10; i++) { + RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); + Assert.assertTrue(reply.isSuccess()); + } + + testFailureCase("Can't set configuration in CAS mode ", + () -> client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder() + .setServersInNewConf(peers) + .setServersInCurrentConf(peers) + .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET) + .build()), SetConfigurationException.class); + + Collections.shuffle(oldPeers); + RaftClientReply reply = client.admin().setConfiguration( + SetConfigurationRequest.Arguments.newBuilder() + .setServersInNewConf(peers) + .setServersInCurrentConf(oldPeers) + .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET) + .build()); + Assert.assertTrue(reply.isSuccess()); + waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); + } + cluster.close(); + } + @Test(timeout = 30000)
