This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 081940a95 RATIS-1911. Add MembershipManager example. (#941)
081940a95 is described below
commit 081940a95b050193c94f9038af11983cdbc56c59
Author: Jinglun <[email protected]>
AuthorDate: Wed Oct 25 09:45:23 2023 +0800
RATIS-1911. Add MembershipManager example. (#941)
---
.../counter/server/CounterStateMachine.java | 2 +-
.../ratis/examples/membership/server/CServer.java | 103 ++++++++++
.../ratis/examples/membership/server/Console.java | 140 ++++++++++++++
.../examples/membership/server/RaftCluster.java | 207 +++++++++++++++++++++
4 files changed, 451 insertions(+), 1 deletion(-)
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
index 299b67325..90f41ddad 100644
---
a/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/counter/server/CounterStateMachine.java
@@ -82,7 +82,7 @@ public class CounterStateMachine extends BaseStateMachine {
private final TimeDuration simulatedSlowness;
- CounterStateMachine(TimeDuration simulatedSlowness) {
+ public CounterStateMachine(TimeDuration simulatedSlowness) {
this.simulatedSlowness = simulatedSlowness;
}
CounterStateMachine() {
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
new file mode 100644
index 000000000..69cc4d58d
--- /dev/null
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/CServer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.membership.server;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.server.CounterStateMachine;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.TimeDuration;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * A simple raft server using {@link CounterStateMachine}.
+ */
+public class CServer implements Closeable {
+ public static final RaftGroupId GROUP_ID = RaftGroupId.randomId();
+ public static final String LOCAL_ADDR = "0.0.0.0";
+
+ private final RaftServer server;
+ private final int port;
+ private final File storageDir;
+
+ public CServer(RaftGroup group, RaftPeerId serverId, int port) throws
IOException {
+ this.storageDir = new File("./" + serverId);
+ this.port = port;
+
+ final RaftProperties properties = new RaftProperties();
+ RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(storageDir));
+ RaftConfigKeys.Rpc.setType(properties, SupportedRpcType.NETTY);
+ NettyConfigKeys.Server.setPort(properties, port);
+
+ // create the counter state machine which holds the counter value.
+ final CounterStateMachine counterStateMachine = new
CounterStateMachine(TimeDuration.ZERO);
+
+ // build the Raft server.
+ this.server = RaftServer.newBuilder()
+ .setGroup(group)
+ .setProperties(properties)
+ .setServerId(serverId)
+ .setStateMachine(counterStateMachine)
+ .setOption(RaftStorage.StartupOption.FORMAT)
+ .build();
+ }
+
+ public void start() throws IOException {
+ server.start();
+ }
+
+ public RaftPeer getPeer() {
+ return server.getPeer();
+ }
+
+ @Override
+ public void close() throws IOException {
+ server.close();
+ FileUtils.deleteFully(storageDir);
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return MoreObjects.toStringHelper(this)
+ .add("server", server.getPeer())
+ .add("role", server.getDivision(GROUP_ID).getInfo().getCurrentRole())
+ .toString();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java
new file mode 100644
index 000000000..1015d59e2
--- /dev/null
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/Console.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.membership.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+
+/**
+ * Interactive command line console.
+ */
+public class Console {
+ public static final String USAGE_MSG =
+ "Usage: java org.apache.ratis.examples.membership.server.Console
[options]\n"
+ + "Options:\n"
+ + "\tupdate [new_peer_ports] Update membership to C_new. Separate
ports with comma. "
+ + "e.g. update 5100,5101\n"
+ + "\tadd [peer_port] Add peer with peer_port to raft cluster.
e.g. add 5103\n"
+ + "\tremove [peer_port] Remove peer with peer_port from raft
cluster. e.g. remove"
+ + " 5100\n"
+ + "\tshow Show all peers of raft cluster.\n"
+ + "\tincr Increment the counter value.\n"
+ + "\tquery Query the value of counter.\n"
+ + "\tquit Quit.";
+
+ private final Scanner sc = new Scanner(System.in, "UTF-8");
+ private final RaftCluster cluster = new RaftCluster();
+
+ private void init() {
+ System.out.println("Raft Server Membership Example.");
+ System.out.println("Type ports seperated by comma for initial peers. e.g.
5100,5101,5102");
+
+ String[] portArguments = commandLineInput()[0].split(",");
+ List<Integer> ports = new ArrayList<>();
+ Arrays.stream(portArguments).map(Integer::parseInt).forEach(ports::add);
+ try {
+ cluster.init(ports);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ show();
+ System.out.println(USAGE_MSG);
+ }
+
+ private void execute() {
+ while (true) {
+ try {
+ String[] args = commandLineInput();
+ String command = args[0];
+
+ if (command.equalsIgnoreCase("show")) {
+ show();
+ } else if (command.equalsIgnoreCase("add")) {
+ add(args, 1);
+ } else if (command.equalsIgnoreCase("remove")) {
+ remove(args, 1);
+ } else if (command.equalsIgnoreCase("update")) {
+ update(args, 1);
+ } else if (command.equalsIgnoreCase("incr")) {
+ cluster.counterIncrement();
+ } else if (command.equalsIgnoreCase("query")) {
+ cluster.queryCounter();
+ } else if (command.equalsIgnoreCase("quit")) {
+ break;
+ } else {
+ System.out.println(USAGE_MSG);
+ }
+ } catch (Exception e) {
+ System.out.println("Get error " + e.getMessage());
+ }
+ }
+ try {
+ System.out.println("Closing cluster...");
+ cluster.close();
+ System.out.println("Cluster closed successfully.");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void show() {
+ cluster.show();
+ }
+
+ private void add(String[] args, int index) throws IOException {
+ int port = Integer.parseInt(args[index]);
+ List<Integer> ports = new ArrayList();
+ ports.add(port);
+ ports.addAll(cluster.ports());
+ cluster.update(ports);
+ }
+
+ private void remove(String[] args, int index) throws IOException {
+ int port = Integer.parseInt(args[index]);
+ List<Integer> ports = new ArrayList<>();
+ ports.addAll(cluster.ports());
+ if (ports.remove(Integer.valueOf(port))) {
+ cluster.update(ports);
+ } else {
+ System.out.println("Invalid port " + port);
+ }
+ }
+
+ private void update(String[] args, int index) throws IOException {
+ String[] portStrArray = args[index].split(",");
+ List<Integer> ports = new ArrayList<>();
+ for (String portStr : portStrArray) {
+ ports.add(Integer.parseInt(portStr));
+ }
+ cluster.update(ports);
+ }
+
+ private String[] commandLineInput() {
+ System.out.print(">>> ");
+ return sc.nextLine().split(" ");
+ }
+
+ public static void main(String[] args) throws IOException {
+ Console console = new Console();
+ console.init();
+ console.execute();
+ }
+}
diff --git
a/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java
new file mode 100644
index 000000000..ec1c6e095
--- /dev/null
+++
b/ratis-examples/src/main/java/org/apache/ratis/examples/membership/server/RaftCluster.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.membership.server;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.counter.CounterCommand;
+import org.apache.ratis.netty.NettyFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.ratis.examples.membership.server.CServer.GROUP_ID;
+import static org.apache.ratis.examples.membership.server.CServer.LOCAL_ADDR;
+
+/**
+ * An in process raft cluster. Running all servers in a single process.
+ */
+public class RaftCluster {
+ private Map<Integer, CServer> members = new HashMap<>();
+
+ /**
+ * Start cluster.
+ *
+ * @param initPorts the ports of the initial peers.
+ */
+ public void init(Collection<Integer> initPorts) throws IOException {
+ RaftGroup group = initGroup(initPorts);
+ for (int port : initPorts) {
+ CServer server = new CServer(group, peerId(port), port);
+ server.start();
+ members.put(port, server);
+ }
+ }
+
+ /**
+ * Update membership to C_new.
+ *
+ * @param newPorts the ports of the C_new peers.
+ */
+ public void update(Collection<Integer> newPorts) throws IOException {
+ Preconditions.assertTrue(members.size() > 0, "Cluster is empty.");
+
+ Collection<CServer> oldPeers = members.values();
+ List<CServer> newPeers = new ArrayList<>();
+ List<CServer> peerToStart = new ArrayList<>();
+ List<CServer> peerToStop = new ArrayList<>();
+
+ for (Integer port : newPorts) {
+ CServer server = members.get(port);
+ if (server == null) {
+ // New peer always start with an empty group.
+ RaftGroup group = RaftGroup.valueOf(GROUP_ID);
+ server = new CServer(group, peerId(port), port);
+ peerToStart.add(server);
+ }
+ newPeers.add(server);
+ }
+
+ for (CServer peer : oldPeers) {
+ if (!newPeers.contains(peer)) {
+ peerToStop.add(peer);
+ }
+ }
+
+ // Step 1: start new peers.
+ System.out.println("Update membership ...... Step 1: start new peers.");
+ System.out.println(peersInfo(peerToStart, "Peers_to_start"));
+ for (CServer server : peerToStart) {
+ server.start();
+ }
+
+ // Step 2: update membership.
+ System.out.println("Update membership ...... Step 2: update membership
from C_old to C_new.");
+ System.out.println(peersInfo(oldPeers, "C_old"));
+ System.out.println(peersInfo(newPeers, "C_new"));
+ if (members.size() > 0) {
+ try (RaftClient client = createClient()) {
+ RaftClientReply reply =
client.admin().setConfiguration(newPeers.stream()
+ .map(CServer::getPeer).collect(Collectors.toList()));
+ if (!reply.isSuccess()) {
+ throw reply.getException();
+ }
+ }
+ }
+
+ // Step 3: stop outdated peers.
+ System.out.println("Update membership ...... Step 3: stop outdated
peers.");
+ System.out.println(peersInfo(peerToStop, "Peers_to_stop"));
+ for (CServer server : peerToStop) {
+ server.close();
+ members.remove(server.getPort());
+ }
+
+ // Add new peers to members.
+ for (CServer server : peerToStart) {
+ members.put(server.getPort(), server);
+ }
+ }
+
+ public void show() {
+ Collection<CServer> peers = members.values();
+ System.out.println(peersInfo(peers, "Cluster members"));
+ }
+
+ public void counterIncrement() throws IOException {
+ RaftClient client = createClient();
+ try {
+ RaftClientReply reply =
client.io().send(CounterCommand.INCREMENT.getMessage());
+ if (!reply.isSuccess()) {
+ throw reply.getException();
+ }
+ } finally {
+ client.close();
+ }
+ }
+
+ public void queryCounter() throws IOException {
+ RaftClient client = createClient();
+ try {
+ RaftClientReply reply =
client.io().sendReadOnly(CounterCommand.GET.getMessage());
+ String count = reply.getMessage().getContent().toStringUtf8();
+ System.out.println("Current counter value: " + count);
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * Configure the raft group with initial peers.
+ */
+ private RaftGroup initGroup(Collection<Integer> ports) {
+ List<RaftPeer> peers = new ArrayList<>();
+ for (int port : ports) {
+ peers.add(RaftPeer.newBuilder()
+ .setId(peerId(port))
+ .setAddress(LOCAL_ADDR + ":" + port)
+ .build());
+ }
+ members.values().stream().map(CServer::getPeer).forEach(peers::add);
+ return RaftGroup.valueOf(GROUP_ID, peers);
+ }
+
+ public Collection<Integer> ports() {
+ return members.keySet();
+ }
+
+ public void close() throws IOException {
+ for (CServer server : members.values()) {
+ server.close();
+ }
+ }
+
+ private RaftClient createClient() {
+ RaftProperties properties = new RaftProperties();
+ RaftClient.Builder builder =
RaftClient.newBuilder().setProperties(properties);
+
+ builder.setRaftGroup(RaftGroup.valueOf(GROUP_ID,
+ members.values().stream().map(s ->
s.getPeer()).collect(Collectors.toList())));
+
+ builder.setClientRpc(new NettyFactory(new
Parameters()).newRaftClientRpc(ClientId.randomId(), properties));
+
+ return builder.build();
+ }
+
+ private static RaftPeerId peerId(int port) {
+ return RaftPeerId.valueOf("p" + port);
+ }
+
+ private static String peersInfo(Collection<CServer> peers, String prefix) {
+ StringBuilder msgBuilder = new StringBuilder(prefix).append("={");
+ if (peers.size() == 0) {
+ msgBuilder.append("}");
+ } else {
+ peers.forEach(p -> msgBuilder.append("\n\t").append(p));
+ msgBuilder.append("\n}");
+ }
+ return msgBuilder.toString();
+ }
+}