This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 2c00073641a2d4a95fa114f6874ee898bae5f9ca Author: qian0817 <[email protected]> AuthorDate: Fri Jul 15 08:47:03 2022 +0800 RATIS-1612. Support starting a server as a Listener. (#673) (cherry picked from commit 43d0275ac9515924468c24e528d4226dc7b79190) --- .../java/org/apache/ratis/protocol/RaftGroup.java | 6 ++-- .../java/org/apache/ratis/protocol/RaftPeer.java | 33 ++++++++++++++++++---- .../java/org/apache/ratis/util/ProtoUtils.java | 2 ++ ratis-proto/src/main/proto/Raft.proto | 1 + .../java/org/apache/ratis/server/RaftServer.java | 9 +++++- .../apache/ratis/server/impl/LeaderElection.java | 2 +- .../apache/ratis/server/impl/LeaderStateImpl.java | 6 ++-- .../ratis/server/impl/PeerConfiguration.java | 2 +- .../apache/ratis/server/impl/RaftServerImpl.java | 25 ++++++++++------ .../org/apache/ratis/server/impl/ServerState.java | 11 +++++++- .../server/impl/SnapshotInstallationHandler.java | 4 +-- .../apache/ratis/server/raftlog/LogProtoUtils.java | 5 +++- 12 files changed, 81 insertions(+), 25 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java index ef052d1c..a18aa5ce 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java @@ -39,7 +39,7 @@ public final class RaftGroup { } /** @return a group with the given id and peers. */ - public static RaftGroup valueOf(RaftGroupId groupId, Collection<RaftPeer> peers) { + public static RaftGroup valueOf(RaftGroupId groupId, Iterable<RaftPeer> peers) { return new RaftGroup(groupId, peers); } @@ -53,12 +53,12 @@ public final class RaftGroup { this.peers = Collections.emptyMap(); } - private RaftGroup(RaftGroupId groupId, Collection<RaftPeer> peers) { + private RaftGroup(RaftGroupId groupId, Iterable<RaftPeer> peers) { this.groupId = Objects.requireNonNull(groupId, "groupId == null"); Preconditions.assertTrue(!groupId.equals(EMPTY_GROUP.getGroupId()), () -> "Group Id " + groupId + " is reserved for the empty group."); - if (peers == null || peers.isEmpty()) { + if (peers == null || !peers.iterator().hasNext()) { this.peers = Collections.emptyMap(); } else { final Map<RaftPeerId, RaftPeer> map = new HashMap<>(); diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java index e35efa8f..b3cea81a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.RaftPeerProto; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.JavaUtils; @@ -65,7 +66,8 @@ public final class RaftPeer { .setAdminAddress(peer.getAdminAddress()) .setClientAddress(peer.getClientAddress()) .setDataStreamAddress(peer.getDataStreamAddress()) - .setPriority(peer.getPriority()); + .setPriority(peer.getPriority()) + .setStartupRole(peer.getStartupRole()); } public static class Builder { @@ -75,6 +77,7 @@ public final class RaftPeer { private String clientAddress; private String dataStreamAddress; private int priority; + private RaftPeerRole startupRole = RaftPeerRole.FOLLOWER; public Builder setId(RaftPeerId id) { this.id = id; @@ -133,10 +136,21 @@ public final class RaftPeer { return this; } + public Builder setStartupRole(RaftPeerRole startupRole) { + if (startupRole != RaftPeerRole.FOLLOWER + && startupRole != RaftPeerRole.LISTENER) { + throw new IllegalArgumentException( + "At startup the role can only be set to FOLLOWER or LISTENER, the current value is " + + startupRole); + } + this.startupRole = startupRole; + return this; + } + public RaftPeer build() { return new RaftPeer( Objects.requireNonNull(id, "The 'id' field is not initialized."), - address, adminAddress, clientAddress, dataStreamAddress, priority); + address, adminAddress, clientAddress, dataStreamAddress, priority, startupRole); } } @@ -151,17 +165,20 @@ public final class RaftPeer { /** The priority of the peer. */ private final int priority; + private final RaftPeerRole startupRole; + private final Supplier<RaftPeerProto> raftPeerProto; private RaftPeer(RaftPeerId id, String address, String adminAddress, String clientAddress, String dataStreamAddress, - int priority) { + int priority, RaftPeerRole startupRole) { this.id = Objects.requireNonNull(id, "id == null"); this.address = address; this.dataStreamAddress = dataStreamAddress; this.adminAddress = adminAddress; this.clientAddress = clientAddress; this.priority = priority; + this.startupRole = startupRole; this.raftPeerProto = JavaUtils.memoize(this::buildRaftPeerProto); } @@ -173,6 +190,7 @@ public final class RaftPeer { Optional.ofNullable(getClientAddress()).ifPresent(builder::setClientAddress); Optional.ofNullable(getAdminAddress()).ifPresent(builder::setAdminAddress); builder.setPriority(priority); + builder.setStartupRole(startupRole); return builder.build(); } @@ -208,6 +226,10 @@ public final class RaftPeer { return priority; } + public RaftPeerRole getStartupRole() { + return startupRole; + } + public RaftPeerProto getRaftPeerProto() { return raftPeerProto.get(); } @@ -220,8 +242,9 @@ public final class RaftPeer { final String client = clientAddress != null && !Objects.equals(address, clientAddress) ? "|client:" + clientAddress : ""; final String data = dataStreamAddress != null? "|dataStream:" + dataStreamAddress: ""; - final String p = "|priority:" + priority; - return id + rpc + admin + client + data + p; + final String p = "|priority:" + priority; + final String role = "|startupRole:" + startupRole; + return id + rpc + admin + client + data + p + role; } @Override diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 80348ff7..e57ae552 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.util; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.RouteProto; @@ -118,6 +119,7 @@ public interface ProtoUtils { .setClientAddress(p.getClientAddress()) .setAdminAddress(p.getAdminAddress()) .setPriority(p.getPriority()) + .setStartupRole(p.hasStartupRole() ? p.getStartupRole() : RaftProtos.RaftPeerRole.FOLLOWER) .build(); } diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index f6b38c14..fa3d15e2 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -28,6 +28,7 @@ message RaftPeerProto { string dataStreamAddress = 4; // address of the data stream server string clientAddress = 5; // address of the client RPC server string adminAddress = 6; // address of the admin RPC server + optional RaftPeerRole startupRole = 7; // peer start up role } message RaftPeerIdProto { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java index 70e4dff5..bdf4c0b4 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java @@ -20,6 +20,7 @@ package org.apache.ratis.server; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.protocol.*; import org.apache.ratis.rpc.RpcType; @@ -29,6 +30,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.thirdparty.com.google.common.collect.Iterables; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.ReflectionUtils; @@ -77,7 +79,12 @@ public interface RaftServer extends Closeable, RpcType.Get, /** @return the {@link RaftGroup} for this division. */ default RaftGroup getGroup() { - return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getAllPeers()); + Collection<RaftPeer> allFollowerPeers = + getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER); + Collection<RaftPeer> allListenerPeers = + getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); + Iterable<RaftPeer> peers = Iterables.concat(allFollowerPeers, allListenerPeers); + return RaftGroup.valueOf(getMemberId().getGroupId(), peers); } /** @return the current {@link RaftConfiguration} for this division. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index a6d9b6d1..73d8c0cd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -329,7 +329,7 @@ class LeaderElection implements Runnable { case REJECTED: case DISCOVERED_A_NEW_TERM: final long term = r.maxTerm(server.getState().getCurrentTerm()); - server.changeToFollowerAndPersistMetadata(term, r); + server.changeToFollowerAndPersistMetadata(term, false, r); return false; default: throw new IllegalArgumentException("Unable to process result " + r.result); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 648a0eb3..101c1343 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -558,7 +558,7 @@ class LeaderStateImpl implements LeaderState { private void stepDown(long term, StepDownReason reason) { try { - server.changeToFollowerAndPersistMetadata(term, reason); + server.changeToFollowerAndPersistMetadata(term, false, reason); pendingStepDown.complete(server::newSuccessReply); } catch(IOException e) { final String s = this + ": Failed to persist metadata for term " + term; @@ -953,7 +953,9 @@ class LeaderStateImpl implements LeaderState { final RaftPeerId followerID = followerInfo.getPeer().getId(); final RaftPeer follower = conf.getPeer(followerID); if (follower == null) { - LOG.error("{} the follower {} is not in the conf {}", this, server.getId(), conf); + if (conf.getPeer(followerID, RaftPeerRole.LISTENER) == null) { + LOG.error("{} the follower {} is not in the conf {}", this, followerID, conf); + } continue; } final int followerPriority = follower.getPriority(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java index 83effa84..172081a6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java @@ -97,7 +97,7 @@ class PeerConfiguration { @Override public String toString() { - return peers.values().toString(); + return "peers:" + peers.values() + "|listeners:" + listeners.values(); } RaftPeer getPeer(RaftPeerId id, RaftPeerRole... roles) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index a24e25d8..0528080a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -494,14 +494,18 @@ class RaftServerImpl implements RaftServer.Division, * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) { + private synchronized boolean changeToFollower( + long newTerm, + boolean force, + boolean allowListener, + Object reason) { final RaftPeerRole old = role.getCurrentRole(); - if (old == RaftPeerRole.LISTENER) { + final boolean metadataUpdated = state.updateCurrentTerm(newTerm); + if (old == RaftPeerRole.LISTENER && !allowListener) { throw new IllegalStateException("Unexpected role " + old); } - final boolean metadataUpdated = state.updateCurrentTerm(newTerm); - if (old != RaftPeerRole.FOLLOWER || force) { + if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) { setRole(RaftPeerRole.FOLLOWER, reason); if (old == RaftPeerRole.LEADER) { role.shutdownLeaderState(false); @@ -515,8 +519,11 @@ class RaftServerImpl implements RaftServer.Division, return metadataUpdated; } - synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object reason) throws IOException { - if (changeToFollower(newTerm, false, reason)) { + synchronized void changeToFollowerAndPersistMetadata( + long newTerm, + boolean allowListener, + Object reason) throws IOException { + if (changeToFollower(newTerm, false, allowListener, reason)) { state.persistMetadata(); } } @@ -571,6 +578,7 @@ class RaftServerImpl implements RaftServer.Division, roleInfo.setCandidateInfo(candidate); break; + case LISTENER: case FOLLOWER: final Optional<FollowerState> fs = role.getFollowerState(); final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto( @@ -1188,7 +1196,8 @@ class RaftServerImpl implements RaftServer.Division, final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); if (candidate != null && phase == Phase.ELECTION) { // change server state in the ELECTION phase - final boolean termUpdated = changeToFollower(candidateTerm, true, "candidate:" + candidateId); + final boolean termUpdated = + changeToFollower(candidateTerm, true, false, "candidate:" + candidateId); if (voteGranted) { state.grantVote(candidate.getId()); } @@ -1346,7 +1355,7 @@ class RaftServerImpl implements RaftServer.Division, return CompletableFuture.completedFuture(reply); } try { - changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries"); + changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); } catch (IOException e) { return JavaUtils.completeExceptionally(e); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 212b6934..52aedfb1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; import static org.apache.ratis.server.RaftServer.Division.LOG; @@ -104,7 +105,15 @@ class ServerState implements Closeable { throws IOException { this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId()); this.server = server; - final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder().setConf(group.getPeers()).build(); + Collection<RaftPeer> followerPeers = group.getPeers().stream() + .filter(peer -> peer.getStartupRole() == RaftPeerRole.FOLLOWER) + .collect(Collectors.toList()); + Collection<RaftPeer> listenerPeers = group.getPeers().stream() + .filter(peer -> peer.getStartupRole() == RaftPeerRole.LISTENER) + .collect(Collectors.toList()); + final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder() + .setConf(followerPeers, listenerPeers) + .build(); configurationManager = new ConfigurationManager(initialConf); LOG.info("{}: {}", getMemberId(), configurationManager); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index afe6e8fa..18f3b542 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -165,7 +165,7 @@ class SnapshotInstallationHandler { LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", getMemberId()); return reply; } - server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); + server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); @@ -212,7 +212,7 @@ class SnapshotInstallationHandler { LOG.warn("{}: Failed to recognize leader for installSnapshot notification.", getMemberId()); return reply; } - server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); + server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java index 849dda5f..86d6fc61 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java @@ -85,7 +85,10 @@ public final class LogProtoUtils { private static RaftConfigurationProto.Builder toRaftConfigurationProtoBuilder(RaftConfiguration conf) { return RaftConfigurationProto.newBuilder() .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers())) - .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers())); + .addAllListeners(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers(RaftPeerRole.LISTENER))) + .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers())) + .addAllOldListeners( + ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers(RaftPeerRole.LISTENER))); } public static LogEntryProto toLogEntryProto(StateMachineLogEntryProto proto, long term, long index) {
