This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6d2580f6 RATIS-1593. Support CAS mode to setConfiguration (#682)
6d2580f6 is described below
commit 6d2580f69fdefa87e23f633e0f3a2c8fbc6a1d68
Author: Yaolong Liu <[email protected]>
AuthorDate: Tue Jul 19 09:56:21 2022 +0800
RATIS-1593. Support CAS mode to setConfiguration (#682)
---
.../org/apache/ratis/client/impl/BlockingImpl.java | 4 ++-
.../apache/ratis/client/impl/ClientProtoUtils.java | 6 ++++
.../ratis/protocol/SetConfigurationRequest.java | 39 ++++++++++++++++++++--
.../exceptions/SetConfigurationException.java | 29 ++++++++++++++++
.../org/apache/ratis/util/CollectionUtils.java | 18 ++++++++++
ratis-proto/src/main/proto/Raft.proto | 3 ++
.../ratis/server/impl/RaftConfigurationImpl.java | 4 +--
.../apache/ratis/server/impl/RaftServerImpl.java | 14 ++++++++
.../server/impl/RaftReconfigurationBaseTest.java | 39 ++++++++++++++++++++++
9 files changed, 150 insertions(+), 6 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index e2347140..49bea573 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
@@ -101,7 +102,8 @@ class BlockingImpl implements BlockingApi {
return client.handleReply(request, reply);
}
} catch (GroupMismatchException | StateMachineException |
TransferLeadershipException |
- LeaderSteppingDownException | AlreadyClosedException |
AlreadyExistsException e) {
+ LeaderSteppingDownException | AlreadyClosedException |
AlreadyExistsException |
+ SetConfigurationException e) {
throw e;
} catch (IOException e) {
ioe = e;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index fd9f49f5..859e1d4f 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -502,6 +502,8 @@ public interface ClientProtoUtils {
final SetConfigurationRequest.Arguments arguments =
SetConfigurationRequest.Arguments.newBuilder()
.setServersInNewConf(ProtoUtils.toRaftPeers(p.getPeersList()))
.setListenersInNewConf(ProtoUtils.toRaftPeers(p.getListenersList()))
+
.setServersInCurrentConf(ProtoUtils.toRaftPeers(p.getCurrentPeersList()))
+
.setListenersInCurrentConf(ProtoUtils.toRaftPeers(p.getCurrentListenersList()))
.setMode(toSetConfigurationMode(p.getMode()))
.build();
final RaftRpcRequestProto m = p.getRpcRequest();
@@ -519,6 +521,8 @@ public interface ClientProtoUtils {
return SetConfigurationRequest.Mode.SET_UNCONDITIONALLY;
case ADD:
return SetConfigurationRequest.Mode.ADD;
+ case COMPARE_AND_SET:
+ return SetConfigurationRequest.Mode.COMPARE_AND_SET;
default:
throw new IllegalArgumentException("Unexpected mode " + p);
}
@@ -531,6 +535,8 @@ public interface ClientProtoUtils {
.setRpcRequest(toRaftRpcRequestProtoBuilder(request))
.addAllPeers(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER)))
.addAllListeners(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.LISTENER)))
+
.addAllCurrentPeers(ProtoUtils.toRaftPeerProtos(arguments.getServersInCurrentConf()))
+
.addAllCurrentListeners(ProtoUtils.toRaftPeerProtos(arguments.getListenersInCurrentConf()))
.setMode(SetConfigurationRequestProto.Mode.valueOf(arguments.getMode().name()))
.build();
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index ef3cee92..e5e8236b 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -29,25 +29,37 @@ public class SetConfigurationRequest extends
RaftClientRequest {
public enum Mode {
SET_UNCONDITIONALLY,
- ADD
+ ADD,
+ COMPARE_AND_SET
}
public static final class Arguments {
private final List<RaftPeer> serversInNewConf;
private final List<RaftPeer> listenersInNewConf;
+ private final List<RaftPeer> serversInCurrentConf;
+ private final List<RaftPeer> listenersInCurrentConf;
private final Mode mode;
- private Arguments(List<RaftPeer> serversInNewConf, List<RaftPeer>
listenersInNewConf,Mode mode) {
+ private Arguments(List<RaftPeer> serversInNewConf, List<RaftPeer>
listenersInNewConf, Mode mode,
+ List<RaftPeer> serversInCurrentConf, List<RaftPeer>
listenersInCurrentConf) {
this.serversInNewConf = Optional.ofNullable(serversInNewConf)
.map(Collections::unmodifiableList)
.orElseGet(Collections::emptyList);
this.listenersInNewConf = Optional.ofNullable(listenersInNewConf)
.map(Collections::unmodifiableList)
.orElseGet(Collections::emptyList);
+ this.serversInCurrentConf = Optional.ofNullable(serversInCurrentConf)
+ .map(Collections::unmodifiableList)
+ .orElseGet(Collections::emptyList);
+ this.listenersInCurrentConf = Optional.ofNullable(listenersInCurrentConf)
+ .map(Collections::unmodifiableList)
+ .orElseGet(Collections::emptyList);
this.mode = mode;
Preconditions.assertUnique(serversInNewConf);
Preconditions.assertUnique(listenersInNewConf);
+ Preconditions.assertUnique(serversInCurrentConf);
+ Preconditions.assertUnique(listenersInCurrentConf);
}
public List<RaftPeer> getPeersInNewConf(RaftProtos.RaftPeerRole role) {
@@ -59,6 +71,14 @@ public class SetConfigurationRequest extends
RaftClientRequest {
}
}
+ public List<RaftPeer> getListenersInCurrentConf() {
+ return listenersInCurrentConf;
+ }
+
+ public List<RaftPeer> getServersInCurrentConf() {
+ return serversInCurrentConf;
+ }
+
public List<RaftPeer> getServersInNewConf() {
return serversInNewConf;
}
@@ -81,6 +101,8 @@ public class SetConfigurationRequest extends
RaftClientRequest {
public static class Builder {
private List<RaftPeer> serversInNewConf;
private List<RaftPeer> listenersInNewConf = Collections.emptyList();
+ private List<RaftPeer> serversInCurrentConf = Collections.emptyList();
+ private List<RaftPeer> listenersInCurrentConf = Collections.emptyList();
private Mode mode = Mode.SET_UNCONDITIONALLY;
public Builder setServersInNewConf(List<RaftPeer> serversInNewConf) {
@@ -103,13 +125,24 @@ public class SetConfigurationRequest extends
RaftClientRequest {
return this;
}
+ public Builder setServersInCurrentConf(List<RaftPeer>
serversInCurrentConf) {
+ this.serversInCurrentConf = serversInCurrentConf;
+ return this;
+ }
+
+ public Builder setListenersInCurrentConf(List<RaftPeer>
listenersInCurrentConf) {
+ this.listenersInCurrentConf = listenersInCurrentConf;
+ return this;
+ }
+
public Builder setMode(Mode mode) {
this.mode = mode;
return this;
}
public Arguments build() {
- return new Arguments(serversInNewConf, listenersInNewConf, mode);
+ return new Arguments(serversInNewConf, listenersInNewConf, mode,
serversInCurrentConf,
+ listenersInCurrentConf);
}
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java
new file mode 100644
index 00000000..c7593b80
--- /dev/null
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+public class SetConfigurationException extends RaftException {
+
+ public SetConfigurationException(String message) {
+ super(message);
+ }
+
+ public SetConfigurationException(String message, Throwable t) {
+ super(message, t);
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 39f932ca..cdfd9635 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -140,4 +140,22 @@ public interface CollectionUtils {
return computeIfAbsent(map, key, supplier, () -> {
});
}
+
+ static <V> boolean equalsIgnoreOrder(List<V> left, List<V> right,
Comparator<V> comparator) {
+ if (left == right) {
+ return true;
+ } else if (left == null || right == null) {
+ // only one of them is null (cannot be both null since they are unequal)
+ return false;
+ }
+ final int n = right.size();
+ if (left.size() != n) {
+ return false;
+ }
+ left = new ArrayList<>(left);
+ left.sort(comparator);
+ right = new ArrayList<>(right);
+ right.sort(comparator);
+ return left.equals(right);
+ }
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index fa3d15e2..c571f0c7 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -410,11 +410,14 @@ message SetConfigurationRequestProto {
enum Mode {
SET_UNCONDITIONALLY = 0;
ADD = 1;
+ COMPARE_AND_SET = 2;
}
RaftRpcRequestProto rpcRequest = 1;
repeated RaftPeerProto peers = 2;
repeated RaftPeerProto listeners = 3;
optional Mode mode = 4;
+ repeated RaftPeerProto currentPeers = 5;
+ repeated RaftPeerProto currentListeners = 6;
}
// transfer leadership request
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 9f203b23..ee01f275 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -200,8 +200,8 @@ final class RaftConfigurationImpl implements
RaftConfiguration {
}
@Override
- public Collection<RaftPeer> getAllPeers(RaftPeerRole role) {
- final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers(role));
+ public List<RaftPeer> getAllPeers(RaftPeerRole role) {
+ final List<RaftPeer> peers = new ArrayList<>(conf.getPeers(role));
if (oldConf != null) {
oldConf.getPeers(role).stream()
.filter(p -> !peers.contains(p))
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 9268b99a..d4711fbd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
@@ -1118,6 +1119,19 @@ class RaftServerImpl implements RaftServer.Division,
if (arguments.getMode() == SetConfigurationRequest.Mode.ADD) {
serversInNewConf = add(RaftPeerRole.FOLLOWER, current, arguments);
listenersInNewConf = add(RaftPeerRole.LISTENER, current, arguments);
+ } else if (arguments.getMode() ==
SetConfigurationRequest.Mode.COMPARE_AND_SET) {
+ final Comparator<RaftPeer> comparator =
Comparator.comparing(RaftPeer::getId,
+ Comparator.comparing(RaftPeerId::toString));
+ if
(CollectionUtils.equalsIgnoreOrder(arguments.getServersInCurrentConf(),
+ current.getAllPeers(RaftPeerRole.FOLLOWER), comparator)
+ &&
CollectionUtils.equalsIgnoreOrder(arguments.getListenersInCurrentConf(),
+ current.getAllPeers(RaftPeerRole.LISTENER), comparator)) {
+ serversInNewConf =
arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER);
+ listenersInNewConf =
arguments.getPeersInNewConf(RaftPeerRole.LISTENER);
+ } else {
+ throw new SetConfigurationException("Failed to set configuration:
current configuration "
+ + current + " is different than the request " + request);
+ }
} else {
serversInNewConf = arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER);
listenersInNewConf =
arguments.getPeersInNewConf(RaftPeerRole.LISTENER);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index d9b4e93a..eac63d0b 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
@@ -216,6 +217,44 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
cluster.close();
}
+ @Test
+ public void testSetConfigurationInCasMode() throws Exception {
+ runWithNewCluster(2, this::runTestSetConfigurationInCasMode);
+ }
+
+ private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws
Exception {
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ List<RaftPeer> oldPeers = cluster.getPeers();
+
+ PeerChanges change = cluster.addNewPeers(1, true);
+ List<RaftPeer> peers = Arrays.asList(change.allPeersInNewConf);
+
+ try (final RaftClient client = cluster.createClient(leader.getId())) {
+ for (int i = 0; i < 10; i++) {
+ RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ testFailureCase("Can't set configuration in CAS mode ",
+ () ->
client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
+ .setServersInNewConf(peers)
+ .setServersInCurrentConf(peers)
+ .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
+ .build()), SetConfigurationException.class);
+
+ Collections.shuffle(oldPeers);
+ RaftClientReply reply = client.admin().setConfiguration(
+ SetConfigurationRequest.Arguments.newBuilder()
+ .setServersInNewConf(peers)
+ .setServersInCurrentConf(oldPeers)
+ .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
+ .build());
+ Assert.assertTrue(reply.isSuccess());
+ waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+ }
+ cluster.close();
+ }
+
@Test(timeout = 30000)