This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d2580f6 RATIS-1593. Support CAS mode to setConfiguration (#682)
6d2580f6 is described below

commit 6d2580f69fdefa87e23f633e0f3a2c8fbc6a1d68
Author: Yaolong Liu <[email protected]>
AuthorDate: Tue Jul 19 09:56:21 2022 +0800

    RATIS-1593. Support CAS mode to setConfiguration (#682)
---
 .../org/apache/ratis/client/impl/BlockingImpl.java |  4 ++-
 .../apache/ratis/client/impl/ClientProtoUtils.java |  6 ++++
 .../ratis/protocol/SetConfigurationRequest.java    | 39 ++++++++++++++++++++--
 .../exceptions/SetConfigurationException.java      | 29 ++++++++++++++++
 .../org/apache/ratis/util/CollectionUtils.java     | 18 ++++++++++
 ratis-proto/src/main/proto/Raft.proto              |  3 ++
 .../ratis/server/impl/RaftConfigurationImpl.java   |  4 +--
 .../apache/ratis/server/impl/RaftServerImpl.java   | 14 ++++++++
 .../server/impl/RaftReconfigurationBaseTest.java   | 39 ++++++++++++++++++++++
 9 files changed, 150 insertions(+), 6 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index e2347140..49bea573 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
@@ -101,7 +102,8 @@ class BlockingImpl implements BlockingApi {
           return client.handleReply(request, reply);
         }
       } catch (GroupMismatchException | StateMachineException | 
TransferLeadershipException |
-          LeaderSteppingDownException | AlreadyClosedException | 
AlreadyExistsException e) {
+               LeaderSteppingDownException | AlreadyClosedException | 
AlreadyExistsException |
+               SetConfigurationException e) {
         throw e;
       } catch (IOException e) {
         ioe = e;
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index fd9f49f5..859e1d4f 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -502,6 +502,8 @@ public interface ClientProtoUtils {
     final SetConfigurationRequest.Arguments arguments = 
SetConfigurationRequest.Arguments.newBuilder()
         .setServersInNewConf(ProtoUtils.toRaftPeers(p.getPeersList()))
         .setListenersInNewConf(ProtoUtils.toRaftPeers(p.getListenersList()))
+        
.setServersInCurrentConf(ProtoUtils.toRaftPeers(p.getCurrentPeersList()))
+        
.setListenersInCurrentConf(ProtoUtils.toRaftPeers(p.getCurrentListenersList()))
         .setMode(toSetConfigurationMode(p.getMode()))
         .build();
     final RaftRpcRequestProto m = p.getRpcRequest();
@@ -519,6 +521,8 @@ public interface ClientProtoUtils {
         return SetConfigurationRequest.Mode.SET_UNCONDITIONALLY;
       case ADD:
         return SetConfigurationRequest.Mode.ADD;
+      case COMPARE_AND_SET:
+        return SetConfigurationRequest.Mode.COMPARE_AND_SET;
       default:
         throw new IllegalArgumentException("Unexpected mode " + p);
     }
@@ -531,6 +535,8 @@ public interface ClientProtoUtils {
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
         
.addAllPeers(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER)))
         
.addAllListeners(ProtoUtils.toRaftPeerProtos(arguments.getPeersInNewConf(RaftPeerRole.LISTENER)))
+        
.addAllCurrentPeers(ProtoUtils.toRaftPeerProtos(arguments.getServersInCurrentConf()))
+        
.addAllCurrentListeners(ProtoUtils.toRaftPeerProtos(arguments.getListenersInCurrentConf()))
         
.setMode(SetConfigurationRequestProto.Mode.valueOf(arguments.getMode().name()))
         .build();
   }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index ef3cee92..e5e8236b 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -29,25 +29,37 @@ public class SetConfigurationRequest extends 
RaftClientRequest {
 
   public enum Mode {
     SET_UNCONDITIONALLY,
-    ADD
+    ADD,
+    COMPARE_AND_SET
   }
 
   public static final class Arguments {
     private final List<RaftPeer> serversInNewConf;
     private final List<RaftPeer> listenersInNewConf;
+    private final List<RaftPeer> serversInCurrentConf;
+    private final List<RaftPeer> listenersInCurrentConf;
     private final Mode mode;
 
-    private Arguments(List<RaftPeer> serversInNewConf, List<RaftPeer> 
listenersInNewConf,Mode mode) {
+    private Arguments(List<RaftPeer> serversInNewConf, List<RaftPeer> 
listenersInNewConf, Mode mode,
+        List<RaftPeer> serversInCurrentConf, List<RaftPeer> 
listenersInCurrentConf) {
       this.serversInNewConf = Optional.ofNullable(serversInNewConf)
           .map(Collections::unmodifiableList)
           .orElseGet(Collections::emptyList);
       this.listenersInNewConf = Optional.ofNullable(listenersInNewConf)
           .map(Collections::unmodifiableList)
           .orElseGet(Collections::emptyList);
+      this.serversInCurrentConf = Optional.ofNullable(serversInCurrentConf)
+          .map(Collections::unmodifiableList)
+          .orElseGet(Collections::emptyList);
+      this.listenersInCurrentConf = Optional.ofNullable(listenersInCurrentConf)
+          .map(Collections::unmodifiableList)
+          .orElseGet(Collections::emptyList);
       this.mode = mode;
 
       Preconditions.assertUnique(serversInNewConf);
       Preconditions.assertUnique(listenersInNewConf);
+      Preconditions.assertUnique(serversInCurrentConf);
+      Preconditions.assertUnique(listenersInCurrentConf);
     }
 
     public List<RaftPeer> getPeersInNewConf(RaftProtos.RaftPeerRole role) {
@@ -59,6 +71,14 @@ public class SetConfigurationRequest extends 
RaftClientRequest {
       }
     }
 
+    public List<RaftPeer> getListenersInCurrentConf() {
+      return listenersInCurrentConf;
+    }
+
+    public List<RaftPeer> getServersInCurrentConf() {
+      return serversInCurrentConf;
+    }
+
     public List<RaftPeer> getServersInNewConf() {
       return serversInNewConf;
     }
@@ -81,6 +101,8 @@ public class SetConfigurationRequest extends 
RaftClientRequest {
     public static class Builder {
       private List<RaftPeer> serversInNewConf;
       private List<RaftPeer> listenersInNewConf = Collections.emptyList();
+      private List<RaftPeer> serversInCurrentConf = Collections.emptyList();
+      private List<RaftPeer> listenersInCurrentConf = Collections.emptyList();
       private Mode mode = Mode.SET_UNCONDITIONALLY;
 
       public Builder setServersInNewConf(List<RaftPeer> serversInNewConf) {
@@ -103,13 +125,24 @@ public class SetConfigurationRequest extends 
RaftClientRequest {
         return this;
       }
 
+      public Builder setServersInCurrentConf(List<RaftPeer> 
serversInCurrentConf) {
+        this.serversInCurrentConf = serversInCurrentConf;
+        return this;
+      }
+
+      public Builder setListenersInCurrentConf(List<RaftPeer> 
listenersInCurrentConf) {
+        this.listenersInCurrentConf = listenersInCurrentConf;
+        return this;
+      }
+
       public Builder setMode(Mode mode) {
         this.mode = mode;
         return this;
       }
 
       public Arguments build() {
-        return new Arguments(serversInNewConf, listenersInNewConf, mode);
+        return new Arguments(serversInNewConf, listenersInNewConf, mode, 
serversInCurrentConf,
+            listenersInCurrentConf);
       }
     }
   }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java
new file mode 100644
index 00000000..c7593b80
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/SetConfigurationException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+public class SetConfigurationException extends RaftException {
+
+  public SetConfigurationException(String message) {
+    super(message);
+  }
+
+  public SetConfigurationException(String message, Throwable t) {
+    super(message, t);
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 39f932ca..cdfd9635 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -140,4 +140,22 @@ public interface CollectionUtils {
     return computeIfAbsent(map, key, supplier, () -> {
     });
   }
+
+  static <V> boolean equalsIgnoreOrder(List<V> left, List<V> right, 
Comparator<V> comparator) {
+    if (left == right) {
+      return true;
+    } else if (left == null || right == null) {
+      // only one of them is null (cannot be both null since they are unequal)
+      return false;
+    }
+    final int n = right.size();
+    if (left.size() != n) {
+      return false;
+    }
+    left = new ArrayList<>(left);
+    left.sort(comparator);
+    right = new ArrayList<>(right);
+    right.sort(comparator);
+    return left.equals(right);
+  }
 }
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index fa3d15e2..c571f0c7 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -410,11 +410,14 @@ message SetConfigurationRequestProto {
   enum Mode {
     SET_UNCONDITIONALLY = 0;
     ADD = 1;
+    COMPARE_AND_SET = 2;
   }
   RaftRpcRequestProto rpcRequest = 1;
   repeated RaftPeerProto peers = 2;
   repeated RaftPeerProto listeners = 3;
   optional Mode mode = 4;
+  repeated RaftPeerProto currentPeers = 5;
+  repeated RaftPeerProto currentListeners = 6;
 }
 
 // transfer leadership request
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 9f203b23..ee01f275 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -200,8 +200,8 @@ final class RaftConfigurationImpl implements 
RaftConfiguration {
   }
 
   @Override
-  public Collection<RaftPeer> getAllPeers(RaftPeerRole role) {
-    final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers(role));
+  public List<RaftPeer> getAllPeers(RaftPeerRole role) {
+    final List<RaftPeer> peers = new ArrayList<>(conf.getPeers(role));
     if (oldConf != null) {
       oldConf.getPeers(role).stream()
           .filter(p -> !peers.contains(p))
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 9268b99a..d4711fbd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
@@ -1118,6 +1119,19 @@ class RaftServerImpl implements RaftServer.Division,
       if (arguments.getMode() == SetConfigurationRequest.Mode.ADD) {
         serversInNewConf = add(RaftPeerRole.FOLLOWER, current, arguments);
         listenersInNewConf = add(RaftPeerRole.LISTENER, current, arguments);
+      } else if (arguments.getMode() == 
SetConfigurationRequest.Mode.COMPARE_AND_SET) {
+        final Comparator<RaftPeer> comparator = 
Comparator.comparing(RaftPeer::getId,
+            Comparator.comparing(RaftPeerId::toString));
+        if 
(CollectionUtils.equalsIgnoreOrder(arguments.getServersInCurrentConf(),
+            current.getAllPeers(RaftPeerRole.FOLLOWER), comparator)
+            && 
CollectionUtils.equalsIgnoreOrder(arguments.getListenersInCurrentConf(),
+            current.getAllPeers(RaftPeerRole.LISTENER), comparator)) {
+          serversInNewConf = 
arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER);
+          listenersInNewConf = 
arguments.getPeersInNewConf(RaftPeerRole.LISTENER);
+        } else {
+          throw new SetConfigurationException("Failed to set configuration: 
current configuration "
+              + current + " is different than the request " + request);
+        }
       } else {
         serversInNewConf = arguments.getPeersInNewConf(RaftPeerRole.FOLLOWER);
         listenersInNewConf = 
arguments.getPeersInNewConf(RaftPeerRole.LISTENER);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index d9b4e93a..eac63d0b 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
@@ -216,6 +217,44 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
     cluster.close();
   }
 
+  @Test
+  public void testSetConfigurationInCasMode() throws Exception {
+    runWithNewCluster(2, this::runTestSetConfigurationInCasMode);
+  }
+
+  private void runTestSetConfigurationInCasMode(CLUSTER cluster) throws 
Exception {
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    List<RaftPeer> oldPeers = cluster.getPeers();
+
+    PeerChanges change = cluster.addNewPeers(1, true);
+    List<RaftPeer> peers = Arrays.asList(change.allPeersInNewConf);
+
+    try (final RaftClient client = cluster.createClient(leader.getId())) {
+      for (int i = 0; i < 10; i++) {
+        RaftClientReply reply = client.io().send(new SimpleMessage("m" + i));
+        Assert.assertTrue(reply.isSuccess());
+      }
+
+      testFailureCase("Can't set configuration in CAS mode ",
+          () -> 
client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
+              .setServersInNewConf(peers)
+              .setServersInCurrentConf(peers)
+              .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
+              .build()), SetConfigurationException.class);
+
+      Collections.shuffle(oldPeers);
+      RaftClientReply reply = client.admin().setConfiguration(
+          SetConfigurationRequest.Arguments.newBuilder()
+              .setServersInNewConf(peers)
+              .setServersInCurrentConf(oldPeers)
+              .setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
+              .build());
+      Assert.assertTrue(reply.isSuccess());
+      waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+    }
+    cluster.close();
+  }
+
 
 
   @Test(timeout = 30000)

Reply via email to