This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 081940a95 RATIS-1911. Add MembershipManager example. (#941)
081940a95 is described below

commit 081940a95b050193c94f9038af11983cdbc56c59
Author: Jinglun <[email protected]>
AuthorDate: Wed Oct 25 09:45:23 2023 +0800

    RATIS-1911. Add MembershipManager example. (#941)
---
 .../counter/server/CounterStateMachine.java        |   2 +-
 .../ratis/examples/membership/server/CServer.java  | 103 ++++++++++
 .../ratis/examples/membership/server/Console.java  | 140 ++++++++++++++
 .../examples/membership/server/RaftCluster.java    | 207 +++++++++++++++++++++
 4 files changed, 451 insertions(+), 1 deletion(-)

diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index 299b67325..90f41ddad 100644
--- 
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -82,7 +82,7 @@ public class CounterStateMachine extends BaseStateMachine {
 
   private final TimeDuration simulatedSlowness;
 
-  CounterStateMachine(TimeDuration simulatedSlowness) {
+  public CounterStateMachine(TimeDuration simulatedSlowness) {
     this.simulatedSlowness = simulatedSlowness;
   }
   CounterStateMachine() {
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
new file mode 100644
index 000000000..69cc4d58d
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.membership.server;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.server.CounterStateMachine;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.TimeDuration;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * A simple raft server using {@link CounterStateMachine}.
+ */
+public class CServer implements Closeable {
+  public static final RaftGroupId GROUP_ID = RaftGroupId.randomId();
+  public static final String LOCAL_ADDR = "0.0.0.0";
+
+  private final RaftServer server;
+  private final int port;
+  private final File storageDir;
+
+  public CServer(RaftGroup group, RaftPeerId serverId, int port) throws 
IOException {
+    this.storageDir = new File("./" + serverId);
+    this.port = port;
+
+    final RaftProperties properties = new RaftProperties();
+    RaftServerConfigKeys.setStorageDir(properties, 
Collections.singletonList(storageDir));
+    RaftConfigKeys.Rpc.setType(properties, SupportedRpcType.NETTY);
+    NettyConfigKeys.Server.setPort(properties, port);
+
+    // create the counter state machine which holds the counter value.
+    final CounterStateMachine counterStateMachine = new 
CounterStateMachine(TimeDuration.ZERO);
+
+    // build the Raft server.
+    this.server = RaftServer.newBuilder()
+        .setGroup(group)
+        .setProperties(properties)
+        .setServerId(serverId)
+        .setStateMachine(counterStateMachine)
+        .setOption(RaftStorage.StartupOption.FORMAT)
+        .build();
+  }
+
+  public void start() throws IOException {
+    server.start();
+  }
+
+  public RaftPeer getPeer() {
+    return server.getPeer();
+  }
+
+  @Override
+  public void close() throws IOException {
+    server.close();
+    FileUtils.deleteFully(storageDir);
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return MoreObjects.toStringHelper(this)
+          .add("server", server.getPeer())
+          .add("role", server.getDivision(GROUP_ID).getInfo().getCurrentRole())
+          .toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public int getPort() {
+    return port;
+  }
+}
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java
new file mode 100644
index 000000000..1015d59e2
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.membership.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+
+/**
+ * Interactive command line console.
+ */
+public class Console {
+  public static final String USAGE_MSG =
+    "Usage: java org.apache.ratis.examples.membership.server.Console 
[options]\n"
+      + "Options:\n"
+      + "\tupdate [new_peer_ports]   Update membership to C_new. Separate 
ports with comma. "
+      + "e.g. update 5100,5101\n"
+      + "\tadd [peer_port]           Add peer with peer_port to raft cluster. 
e.g. add 5103\n"
+      + "\tremove  [peer_port]       Remove peer with peer_port from raft 
cluster. e.g. remove"
+      + " 5100\n"
+      + "\tshow                      Show all peers of raft cluster.\n"
+      + "\tincr                      Increment the counter value.\n"
+      + "\tquery                     Query the value of counter.\n"
+      + "\tquit                      Quit.";
+
+  private final Scanner sc = new Scanner(System.in, "UTF-8");
+  private final RaftCluster cluster = new RaftCluster();
+
+  private void init() {
+    System.out.println("Raft Server Membership Example.");
+    System.out.println("Type ports seperated by comma for initial peers. e.g. 
5100,5101,5102");
+
+    String[] portArguments = commandLineInput()[0].split(",");
+    List<Integer> ports = new ArrayList<>();
+    Arrays.stream(portArguments).map(Integer::parseInt).forEach(ports::add);
+    try {
+      cluster.init(ports);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    show();
+    System.out.println(USAGE_MSG);
+  }
+
+  private void execute() {
+    while (true) {
+      try {
+        String[] args = commandLineInput();
+        String command = args[0];
+
+        if (command.equalsIgnoreCase("show")) {
+          show();
+        } else if (command.equalsIgnoreCase("add")) {
+          add(args, 1);
+        } else if (command.equalsIgnoreCase("remove")) {
+          remove(args, 1);
+        } else if (command.equalsIgnoreCase("update")) {
+          update(args, 1);
+        } else if (command.equalsIgnoreCase("incr")) {
+          cluster.counterIncrement();
+        } else if (command.equalsIgnoreCase("query")) {
+          cluster.queryCounter();
+        } else if (command.equalsIgnoreCase("quit")) {
+          break;
+        } else {
+          System.out.println(USAGE_MSG);
+        }
+      } catch (Exception e) {
+        System.out.println("Get error " + e.getMessage());
+      }
+    }
+    try {
+      System.out.println("Closing cluster...");
+      cluster.close();
+      System.out.println("Cluster closed successfully.");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void show() {
+    cluster.show();
+  }
+
+  private void add(String[] args, int index) throws IOException {
+    int port = Integer.parseInt(args[index]);
+    List<Integer> ports = new ArrayList();
+    ports.add(port);
+    ports.addAll(cluster.ports());
+    cluster.update(ports);
+  }
+
+  private void remove(String[] args, int index) throws IOException {
+    int port = Integer.parseInt(args[index]);
+    List<Integer> ports = new ArrayList<>();
+    ports.addAll(cluster.ports());
+    if (ports.remove(Integer.valueOf(port))) {
+      cluster.update(ports);
+    } else {
+      System.out.println("Invalid port " + port);
+    }
+  }
+
+  private void update(String[] args, int index) throws IOException {
+    String[] portStrArray = args[index].split(",");
+    List<Integer> ports = new ArrayList<>();
+    for (String portStr : portStrArray) {
+      ports.add(Integer.parseInt(portStr));
+    }
+    cluster.update(ports);
+  }
+
+  private String[] commandLineInput() {
+    System.out.print(">>> ");
+    return sc.nextLine().split(" ");
+  }
+
+  public static void main(String[] args) throws IOException {
+    Console console = new Console();
+    console.init();
+    console.execute();
+  }
+}
diff --git 
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java
 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java
new file mode 100644
index 000000000..ec1c6e095
--- /dev/null
+++ 
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.membership.server;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.netty.NettyFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.ratis.examples.membership.server.CServer.GROUP_ID;
+import static org.apache.ratis.examples.membership.server.CServer.LOCAL_ADDR;
+
+/**
+ * An in process raft cluster. Running all servers in a single process.
+ */
+public class RaftCluster {
+  private Map<Integer, CServer> members = new HashMap<>();
+
+  /**
+   * Start cluster.
+   *
+   * @param initPorts the ports of the initial peers.
+   */
+  public void init(Collection<Integer> initPorts) throws IOException {
+    RaftGroup group = initGroup(initPorts);
+    for (int port : initPorts) {
+      CServer server = new CServer(group, peerId(port), port);
+      server.start();
+      members.put(port, server);
+    }
+  }
+
+  /**
+   * Update membership to C_new.
+   *
+   * @param newPorts the ports of the C_new peers.
+   */
+  public void update(Collection<Integer> newPorts) throws IOException {
+    Preconditions.assertTrue(members.size() > 0, "Cluster is empty.");
+
+    Collection<CServer> oldPeers = members.values();
+    List<CServer> newPeers = new ArrayList<>();
+    List<CServer> peerToStart = new ArrayList<>();
+    List<CServer> peerToStop = new ArrayList<>();
+
+    for (Integer port : newPorts) {
+      CServer server = members.get(port);
+      if (server == null) {
+        // New peer always start with an empty group.
+        RaftGroup group = RaftGroup.valueOf(GROUP_ID);
+        server = new CServer(group, peerId(port), port);
+        peerToStart.add(server);
+      }
+      newPeers.add(server);
+    }
+
+    for (CServer peer : oldPeers) {
+      if (!newPeers.contains(peer)) {
+        peerToStop.add(peer);
+      }
+    }
+
+    // Step 1: start new peers.
+    System.out.println("Update membership ...... Step 1: start new peers.");
+    System.out.println(peersInfo(peerToStart, "Peers_to_start"));
+    for (CServer server : peerToStart) {
+      server.start();
+    }
+
+    // Step 2: update membership.
+    System.out.println("Update membership ...... Step 2: update membership 
from C_old to C_new.");
+    System.out.println(peersInfo(oldPeers, "C_old"));
+    System.out.println(peersInfo(newPeers, "C_new"));
+    if (members.size() > 0) {
+      try (RaftClient client = createClient()) {
+        RaftClientReply reply = 
client.admin().setConfiguration(newPeers.stream()
+            .map(CServer::getPeer).collect(Collectors.toList()));
+        if (!reply.isSuccess()) {
+          throw reply.getException();
+        }
+      }
+    }
+
+    // Step 3: stop outdated peers.
+    System.out.println("Update membership ...... Step 3: stop outdated 
peers.");
+    System.out.println(peersInfo(peerToStop, "Peers_to_stop"));
+    for (CServer server : peerToStop) {
+      server.close();
+      members.remove(server.getPort());
+    }
+
+    // Add new peers to members.
+    for (CServer server : peerToStart) {
+      members.put(server.getPort(), server);
+    }
+  }
+
+  public void show() {
+    Collection<CServer> peers = members.values();
+    System.out.println(peersInfo(peers, "Cluster members"));
+  }
+
+  public void counterIncrement() throws IOException {
+    RaftClient client = createClient();
+    try {
+      RaftClientReply reply = 
client.io().send(CounterCommand.INCREMENT.getMessage());
+      if (!reply.isSuccess()) {
+        throw reply.getException();
+      }
+    } finally {
+      client.close();
+    }
+  }
+
+  public void queryCounter() throws IOException {
+    RaftClient client = createClient();
+    try {
+      RaftClientReply reply = 
client.io().sendReadOnly(CounterCommand.GET.getMessage());
+      String count = reply.getMessage().getContent().toStringUtf8();
+      System.out.println("Current counter value: " + count);
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Configure the raft group with initial peers.
+   */
+  private RaftGroup initGroup(Collection<Integer> ports) {
+    List<RaftPeer> peers = new ArrayList<>();
+    for (int port : ports) {
+      peers.add(RaftPeer.newBuilder()
+          .setId(peerId(port))
+          .setAddress(LOCAL_ADDR + ":" + port)
+          .build());
+    }
+    members.values().stream().map(CServer::getPeer).forEach(peers::add);
+    return RaftGroup.valueOf(GROUP_ID, peers);
+  }
+
+  public Collection<Integer> ports() {
+    return members.keySet();
+  }
+
+  public void close() throws IOException {
+    for (CServer server : members.values()) {
+      server.close();
+    }
+  }
+
+  private RaftClient createClient() {
+    RaftProperties properties = new RaftProperties();
+    RaftClient.Builder builder = 
RaftClient.newBuilder().setProperties(properties);
+
+    builder.setRaftGroup(RaftGroup.valueOf(GROUP_ID,
+        members.values().stream().map(s -> 
s.getPeer()).collect(Collectors.toList())));
+
+    builder.setClientRpc(new NettyFactory(new 
Parameters()).newRaftClientRpc(ClientId.randomId(), properties));
+
+    return builder.build();
+  }
+
+  private static RaftPeerId peerId(int port) {
+    return RaftPeerId.valueOf("p" + port);
+  }
+
+  private static String peersInfo(Collection<CServer> peers, String prefix) {
+    StringBuilder msgBuilder = new StringBuilder(prefix).append("={");
+    if (peers.size() == 0) {
+      msgBuilder.append("}");
+    } else {
+      peers.forEach(p -> msgBuilder.append("\n\t").append(p));
+      msgBuilder.append("\n}");
+    }
+    return msgBuilder.toString();
+  }
+}

Reply via email to