This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 95ea26c29 RATIS-2095. Extract common logic of ratis-shell to RaftUtils
for reuse (#1098)
95ea26c29 is described below
commit 95ea26c29e457ff0f8aee7ff7106536c42c4dc52
Author: DaveTeng0 <[email protected]>
AuthorDate: Fri Jun 14 07:20:03 2024 -0700
RATIS-2095. Extract common logic of ratis-shell to RaftUtils for reuse
(#1098)
---
.../java/org/apache/ratis/shell/cli/RaftUtils.java | 124 ++++++++++++++++++++-
.../shell/cli/sh/command/AbstractRatisCommand.java | 98 ++++------------
2 files changed, 145 insertions(+), 77 deletions(-)
diff --git
a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java
b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java
index 1239fc56c..9c5d90e4b 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/RaftUtils.java
@@ -20,20 +20,38 @@ package org.apache.ratis.shell.cli;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.retry.ExponentialBackoffRetry;
import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedFunction;
+import java.io.IOException;
+import java.io.PrintStream;
import java.net.InetSocketAddress;
-import java.util.Properties;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.UUID;
/**
* Helper class for raft operations.
*/
public final class RaftUtils {
+ public static final RaftGroupId DEFAULT_RAFT_GROUP_ID =
RaftGroupId.randomId();
+
private RaftUtils() {
// prevent instantiation
}
@@ -86,4 +104,108 @@ public final class RaftUtils {
.setRetryPolicy(retryPolicy)
.build();
}
+
+ /**
+ * Apply the given function to the given parameter a list.
+ *
+ * @param list the input parameter list
+ * @param function the function to be applied
+ * @param <PARAMETER> parameter type
+ * @param <RETURN> return value type
+ * @param <EXCEPTION> the exception type thrown by the given function.
+ * @return the first non-null value returned by the given function applied
to the given list.
+ */
+ private static <PARAMETER, RETURN, EXCEPTION extends Throwable> RETURN
applyFunctionReturnFirstNonNull(
+ Collection<PARAMETER> list, CheckedFunction<PARAMETER, RETURN,
EXCEPTION> function) {
+ for (PARAMETER parameter : list) {
+ try {
+ RETURN ret = function.apply(parameter);
+ if (ret != null) {
+ return ret;
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+ public static List<RaftPeer> buildRaftPeersFromStr(String peers) {
+ List<InetSocketAddress> addresses = new ArrayList<>();
+ String[] peersArray = peers.split(",");
+ for (String peer : peersArray) {
+ addresses.add(parseInetSocketAddress(peer));
+ }
+
+ return addresses.stream()
+ .map(addr -> RaftPeer.newBuilder()
+ .setId(RaftUtils.getPeerId(addr))
+ .setAddress(addr)
+ .build()
+ ).collect(Collectors.toList());
+ }
+
+ public static RaftGroupId buildRaftGroupIdFromStr(String groupId) {
+ return groupId != null && groupId.isEmpty() ?
RaftGroupId.valueOf(UUID.fromString(groupId))
+ : DEFAULT_RAFT_GROUP_ID;
+ }
+
+ public static RaftGroupId retrieveRemoteGroupId(RaftGroupId
raftGroupIdFromConfig,
+ List<RaftPeer> peers,
+ RaftClient client,
PrintStream printStream) throws IOException {
+ if (!DEFAULT_RAFT_GROUP_ID .equals(raftGroupIdFromConfig)) {
+ return raftGroupIdFromConfig;
+ }
+
+ final RaftGroupId remoteGroupId;
+ final List<RaftGroupId> groupIds = applyFunctionReturnFirstNonNull(peers,
+ p -> client.getGroupManagementApi((p.getId())).list().getGroupIds());
+
+ if (groupIds == null) {
+ printStream.println("Failed to get group ID from " + peers);
+ throw new IOException("Failed to get group ID from " + peers);
+ } else if (groupIds.size() == 1) {
+ remoteGroupId = groupIds.get(0);
+ } else {
+ String message = "Unexpected multiple group IDs " + groupIds
+ + ". In such case, the target group ID must be specified.";
+ printStream.println(message);
+ throw new IOException(message);
+ }
+ return remoteGroupId;
+ }
+
+ public static GroupInfoReply retrieveGroupInfoByGroupId(RaftGroupId
remoteGroupId, List<RaftPeer> peers,
+ RaftClient client,
PrintStream printStream)
+ throws IOException {
+ GroupInfoReply groupInfoReply = applyFunctionReturnFirstNonNull(peers,
+ p -> client.getGroupManagementApi((p.getId())).info(remoteGroupId));
+ processReply(groupInfoReply, printStream::println,
+ () -> "Failed to get group info for group id " +
remoteGroupId.getUuid() + " from " + peers);
+ return groupInfoReply;
+ }
+
+ public static void processReply(RaftClientReply reply, Consumer<String>
printer, Supplier<String> message)
+ throws IOException {
+ if (reply == null || !reply.isSuccess()) {
+ final RaftException e = Optional.ofNullable(reply)
+ .map(RaftClientReply::getException)
+ .orElseGet(() -> new RaftException("Reply: " + reply));
+ printer.accept(message.get());
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ public static InetSocketAddress parseInetSocketAddress(String address) {
+ try {
+ final String[] hostPortPair = address.split(":");
+ if (hostPortPair.length < 2) {
+ throw new IllegalArgumentException("Unexpected address format
<HOST:PORT>.");
+ }
+ return new InetSocketAddress(hostPortPair[0],
Integer.parseInt(hostPortPair[1]));
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse the server address
parameter \"" + address + "\".", e);
+ }
+ }
+
}
diff --git
a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
index 1888c0e0e..91bdc873b 100644
---
a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
+++
b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
@@ -18,8 +18,12 @@
package org.apache.ratis.shell.cli.sh.command;
import org.apache.commons.cli.Option;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.shell.cli.RaftUtils;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -30,48 +34,30 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.util.ProtoUtils;
-import org.apache.ratis.util.function.CheckedFunction;
import java.io.IOException;
+import java.io.PrintStream;
import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.ratis.shell.cli.RaftUtils.buildRaftGroupIdFromStr;
+import static org.apache.ratis.shell.cli.RaftUtils.buildRaftPeersFromStr;
+import static org.apache.ratis.shell.cli.RaftUtils.retrieveGroupInfoByGroupId;
+import static org.apache.ratis.shell.cli.RaftUtils.retrieveRemoteGroupId;
+
/**
* The base class for the ratis shell which need to connect to server.
*/
public abstract class AbstractRatisCommand extends AbstractCommand {
public static final String PEER_OPTION_NAME = "peers";
public static final String GROUPID_OPTION_NAME = "groupid";
- public static final RaftGroupId DEFAULT_RAFT_GROUP_ID =
RaftGroupId.randomId();
-
- /**
- * Execute a given function with input parameter from the members of a list.
- *
- * @param list the input parameters
- * @param function the function to be executed
- * @param <T> parameter type
- * @param <K> return value type
- * @param <E> the exception type thrown by the given function.
- * @return the value returned by the given function.
- */
- public static <T, K, E extends Throwable> K run(Collection<T> list,
CheckedFunction<T, K, E> function) {
- for (T t : list) {
- try {
- K ret = function.apply(t);
- if (ret != null) {
- return ret;
- }
- } catch (Throwable e) {
- e.printStackTrace();
- }
- }
- return null;
- }
-
private RaftGroup raftGroup;
private GroupInfoReply groupInfoReply;
@@ -81,46 +67,13 @@ public abstract class AbstractRatisCommand extends
AbstractCommand {
@Override
public int run(CommandLine cl) throws IOException {
- List<InetSocketAddress> addresses = new ArrayList<>();
- String peersStr = cl.getOptionValue(PEER_OPTION_NAME);
- String[] peersArray = peersStr.split(",");
- for (String peer : peersArray) {
- addresses.add(parseInetSocketAddress(peer));
- }
-
- final RaftGroupId raftGroupIdFromConfig =
cl.hasOption(GROUPID_OPTION_NAME)?
-
RaftGroupId.valueOf(UUID.fromString(cl.getOptionValue(GROUPID_OPTION_NAME)))
- : DEFAULT_RAFT_GROUP_ID;
-
- List<RaftPeer> peers = addresses.stream()
- .map(addr -> RaftPeer.newBuilder()
- .setId(RaftUtils.getPeerId(addr))
- .setAddress(addr)
- .build()
- ).collect(Collectors.toList());
+ List<RaftPeer> peers =
buildRaftPeersFromStr(cl.getOptionValue(PEER_OPTION_NAME));
+ RaftGroupId raftGroupIdFromConfig =
buildRaftGroupIdFromStr(cl.getOptionValue(GROUPID_OPTION_NAME));
raftGroup = RaftGroup.valueOf(raftGroupIdFromConfig, peers);
+ PrintStream printStream = getPrintStream();
try (final RaftClient client = RaftUtils.createClient(raftGroup)) {
- final RaftGroupId remoteGroupId;
- if (raftGroupIdFromConfig != DEFAULT_RAFT_GROUP_ID) {
- remoteGroupId = raftGroupIdFromConfig;
- } else {
- final List<RaftGroupId> groupIds = run(peers,
- p ->
client.getGroupManagementApi((p.getId())).list().getGroupIds());
-
- if (groupIds == null) {
- println("Failed to get group ID from " + peers);
- return -1;
- } else if (groupIds.size() == 1) {
- remoteGroupId = groupIds.get(0);
- } else {
- println("There are more than one groups, you should specific one. "
+ groupIds);
- return -2;
- }
- }
-
- groupInfoReply = run(peers, p ->
client.getGroupManagementApi((p.getId())).info(remoteGroupId));
- processReply(groupInfoReply,
- () -> "Failed to get group info for group id " +
remoteGroupId.getUuid() + " from " + peers);
+ final RaftGroupId remoteGroupId =
retrieveRemoteGroupId(raftGroupIdFromConfig, peers, client, printStream);
+ groupInfoReply = retrieveGroupInfoByGroupId(remoteGroupId, peers,
client, printStream);
raftGroup = groupInfoReply.getGroup();
}
return 0;
@@ -168,14 +121,7 @@ public abstract class AbstractRatisCommand extends
AbstractCommand {
}
protected void processReply(RaftClientReply reply, Supplier<String>
messageSupplier) throws IOException {
- if (reply == null || !reply.isSuccess()) {
- final RaftException e = Optional.ofNullable(reply)
- .map(RaftClientReply::getException)
- .orElseGet(() -> new RaftException("Reply: " + reply));
- final String message = messageSupplier.get();
- printf("%s. Error: %s%n", message, e);
- throw new IOException(message, e);
- }
+ RaftUtils.processReply(reply, getPrintStream()::println, messageSupplier);
}
protected List<RaftPeerId> getIds(String[] optionValues,
BiConsumer<RaftPeerId, InetSocketAddress> consumer) {