This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit ad03a7be4f97b77dea9e18d45619f97c1fb87757 Author: qian0817 <[email protected]> AuthorDate: Fri Mar 31 16:37:37 2023 +0800 RATIS-1826: Listener will change to follower when using ratis shell --- .../apache/ratis/client/impl/ClientProtoUtils.java | 3 ++- .../org/apache/ratis/protocol/GroupInfoReply.java | 19 ++++++++++++---- ratis-proto/src/main/proto/Raft.proto | 1 + .../apache/ratis/server/impl/RaftServerImpl.java | 7 ++++-- .../apache/ratis/server/raftlog/LogProtoUtils.java | 2 +- .../shell/cli/sh/command/AbstractRatisCommand.java | 19 ++++++++++++++++ .../shell/cli/sh/election/TransferCommand.java | 6 ++++-- .../apache/ratis/shell/cli/sh/peer/AddCommand.java | 8 +++++-- .../ratis/shell/cli/sh/peer/RemoveCommand.java | 10 ++++++--- .../shell/cli/sh/peer/SetPriorityCommand.java | 25 ++++++++++------------ 10 files changed, 71 insertions(+), 29 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index cad0fdc74..60e5aa757 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -539,7 +539,8 @@ public interface ClientProtoUtils { replyProto.getCommitInfosList(), ProtoUtils.toRaftGroup(replyProto.getGroup()), replyProto.getRole(), - replyProto.getIsRaftStorageHealthy()); + replyProto.getIsRaftStorageHealthy(), + replyProto.hasConf()? replyProto.getConf(): null); } static Message toMessage(final ClientMessageEntryProto p) { diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java index 946bf2389..632fa6529 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java @@ -17,10 +17,12 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import java.util.Collection; +import java.util.Optional; /** * The response of server information request. Sent from server to client. @@ -30,21 +32,26 @@ public class GroupInfoReply extends RaftClientReply { private final RaftGroup group; private final RoleInfoProto roleInfoProto; private final boolean isRaftStorageHealthy; + private final RaftConfigurationProto conf; public GroupInfoReply(RaftClientRequest request, Collection<CommitInfoProto> commitInfos, - RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy) { - this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), request.getCallId(), commitInfos, - group, roleInfoProto, isRaftStorageHealthy); + RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy, + RaftConfigurationProto conf) { + this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), + request.getCallId(), commitInfos, + group, roleInfoProto, isRaftStorageHealthy, conf); } @SuppressWarnings("parameternumber") public GroupInfoReply(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Collection<CommitInfoProto> commitInfos, - RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy) { + RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy, + RaftConfigurationProto conf) { super(clientId, serverId, groupId, callId, true, null, null, 0L, commitInfos); this.group = group; this.roleInfoProto = roleInfoProto; this.isRaftStorageHealthy = isRaftStorageHealthy; + this.conf = conf; } public RaftGroup getGroup() { @@ -58,4 +65,8 @@ public class GroupInfoReply extends RaftClientReply { public boolean isRaftStorageHealthy() { return isRaftStorageHealthy; } + + public Optional<RaftConfigurationProto> getConf() { + return Optional.ofNullable(conf); + } } diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 3388f5994..8fa13b218 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -550,4 +550,5 @@ message GroupInfoReplyProto { RoleInfoProto role = 3; bool isRaftStorageHealthy = 4; repeated CommitInfoProto commitInfos = 5; + RaftConfigurationProto conf = 6; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 128900fb5..295ddbc61 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -595,8 +595,11 @@ class RaftServerImpl implements RaftServer.Division, } GroupInfoReply getGroupInfo(GroupInfoRequest request) { - return new GroupInfoReply(request, getCommitInfos(), - getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy()); + final RaftStorageDirectory dir = state.getStorage().getStorageDir(); + final RaftConfigurationProto conf = + LogProtoUtils.toRaftConfigurationProtoBuilder(getRaftConf()).build(); + return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), + dir.isHealthy(), conf); } RoleInfoProto getRoleInfoProto() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java index dd8c67dc8..d84c35eb0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java @@ -83,7 +83,7 @@ public final class LogProtoUtils { .build(); } - private static RaftConfigurationProto.Builder toRaftConfigurationProtoBuilder(RaftConfiguration conf) { + public static RaftConfigurationProto.Builder toRaftConfigurationProtoBuilder(RaftConfiguration conf) { return RaftConfigurationProto.newBuilder() .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers())) .addAllListeners(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers(RaftPeerRole.LISTENER))) diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java index 8e92d9ace..74fcbae3d 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java @@ -25,10 +25,12 @@ import org.apache.ratis.shell.cli.RaftUtils; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; import org.apache.ratis.proto.RaftProtos.FollowerInfoProto; import org.apache.ratis.proto.RaftProtos.RaftPeerProto; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.function.CheckedFunction; import java.io.IOException; @@ -38,6 +40,7 @@ import java.util.*; import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * The base class for all the ratis shell {@link Command} classes. @@ -215,4 +218,20 @@ public abstract class AbstractRatisCommand implements Command { } return ids; } + + protected Stream<RaftPeer> getPeerStream(RaftPeerRole role) { + final RaftConfigurationProto conf = groupInfoReply.getConf().orElse(null); + if (conf == null) { + // Assume all peers are followers in order preserve the pre-listener behaviors. + return role == RaftPeerRole.FOLLOWER ? getRaftGroup().getPeers().stream() : Stream.empty(); + } + final Set<RaftPeer> targets = (role == RaftPeerRole.LISTENER ? conf.getListenersList() : conf.getPeersList()) + .stream() + .map(ProtoUtils::toRaftPeer) + .collect(Collectors.toSet()); + return getRaftGroup() + .getPeers() + .stream() + .filter(targets::contains); + } } diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java index 7dba5ae9e..c71d7f89f 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.exceptions.TransferLeadershipException; @@ -110,10 +111,11 @@ public class TransferCommand extends AbstractRatisCommand { private void setPriority(RaftClient client, RaftPeer target, int priority) throws IOException { printf("Changing priority of peer %s with address %s to %d%n", target.getId(), target.getAddress(), priority); - List<RaftPeer> peers = getRaftGroup().getPeers().stream() + final List<RaftPeer> peers = getPeerStream(RaftPeerRole.FOLLOWER) .map(peer -> peer == target ? RaftPeer.newBuilder(peer).setPriority(priority).build() : peer) .collect(Collectors.toList()); - RaftClientReply reply = client.admin().setConfiguration(peers); + final List<RaftPeer> listeners = getPeerStream(RaftPeerRole.LISTENER).collect(Collectors.toList()); + RaftClientReply reply = client.admin().setConfiguration(peers, listeners); processReply(reply, () -> "Failed to set master priorities"); } diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java index 317233531..3c65bb12d 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -80,15 +81,18 @@ public class AddCommand extends AbstractRatisCommand { } try (RaftClient client = RaftUtils.createClient(getRaftGroup())) { - final Stream<RaftPeer> remaining = getRaftGroup().getPeers().stream(); + final Stream<RaftPeer> remaining = getPeerStream(RaftPeerRole.FOLLOWER); final Stream<RaftPeer> adding = ids.stream().map(raftPeerId -> RaftPeer.newBuilder() .setId(raftPeerId) .setAddress(peersInfo.get(raftPeerId)) .setPriority(0) .build()); final List<RaftPeer> peers = Stream.concat(remaining, adding).collect(Collectors.toList()); + final List<RaftPeer> listeners = getPeerStream(RaftPeerRole.LISTENER) + .collect(Collectors.toList()); System.out.println("New peer list: " + peers); - RaftClientReply reply = client.admin().setConfiguration(peers); + System.out.println("New listener list: " + listeners); + RaftClientReply reply = client.admin().setConfiguration(peers, listeners); processReply(reply, () -> "Failed to change raft peer"); } return 0; diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java index d8cc76272..591851607 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -66,10 +67,13 @@ public class RemoveCommand extends AbstractRatisCommand { "Both " + PEER_ID_OPTION_NAME + " and " + ADDRESS_OPTION_NAME + " options are missing."); } try (RaftClient client = RaftUtils.createClient(getRaftGroup())) { - final List<RaftPeer> remaining = getRaftGroup().getPeers().stream() + final List<RaftPeer> peers = getPeerStream(RaftPeerRole.FOLLOWER) .filter(raftPeer -> !ids.contains(raftPeer.getId())).collect(Collectors.toList()); - System.out.println("New peer list: " + remaining); - RaftClientReply reply = client.admin().setConfiguration(remaining); + final List<RaftPeer> listeners = getPeerStream(RaftPeerRole.LISTENER) + .filter(raftPeer -> !ids.contains(raftPeer.getId())).collect(Collectors.toList()); + System.out.println("New peer list: " + peers); + System.out.println("New listener list: " + listeners); + final RaftClientReply reply = client.admin().setConfiguration(peers, listeners); processReply(reply, () -> "Failed to change raft peer"); } return 0; diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java index 43206f87b..01e81f3c3 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.shell.cli.RaftUtils; @@ -28,10 +29,10 @@ import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand; import org.apache.ratis.shell.cli.sh.command.Context; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class SetPriorityCommand extends AbstractRatisCommand { @@ -55,7 +56,7 @@ public class SetPriorityCommand extends AbstractRatisCommand { Map<String, Integer> addressPriorityMap = new HashMap<>(); for (String optionValue : cl.getOptionValues(PEER_WITH_NEW_PRIORITY_OPTION_NAME)) { String[] str = optionValue.split("[|]"); - if(str.length < 2) { + if (str.length < 2) { println("The format of the parameter is wrong"); return -1; } @@ -63,18 +64,14 @@ public class SetPriorityCommand extends AbstractRatisCommand { } try (RaftClient client = RaftUtils.createClient(getRaftGroup())) { - List<RaftPeer> peers = new ArrayList<>(); - for (RaftPeer peer : getRaftGroup().getPeers()) { - if (!addressPriorityMap.containsKey(peer.getAddress())) { - peers.add(RaftPeer.newBuilder(peer).build()); - } else { - peers.add(RaftPeer.newBuilder(peer) - .setPriority(addressPriorityMap.get(peer.getAddress())) - .build() - ); - } - } - RaftClientReply reply = client.admin().setConfiguration(peers); + final List<RaftPeer> peers = getPeerStream(RaftPeerRole.FOLLOWER).map(peer -> { + final Integer newPriority = addressPriorityMap.get(peer.getAddress()); + final int priority = newPriority != null ? newPriority : peer.getPriority(); + return RaftPeer.newBuilder(peer).setPriority(priority).build(); + }).collect(Collectors.toList()); + final List<RaftPeer> listeners = + getPeerStream(RaftPeerRole.LISTENER).collect(Collectors.toList()); + RaftClientReply reply = client.admin().setConfiguration(peers, listeners); processReply(reply, () -> "Failed to set master priorities "); } return 0;
