This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 43d0275a RATIS-1612. Support starting a server as a Listener. (#673)
43d0275a is described below
commit 43d0275ac9515924468c24e528d4226dc7b79190
Author: qian0817 <[email protected]>
AuthorDate: Fri Jul 15 08:47:03 2022 +0800
RATIS-1612. Support starting a server as a Listener. (#673)
---
.../java/org/apache/ratis/protocol/RaftGroup.java | 6 ++--
.../java/org/apache/ratis/protocol/RaftPeer.java | 33 ++++++++++++++++++----
.../java/org/apache/ratis/util/ProtoUtils.java | 2 ++
ratis-proto/src/main/proto/Raft.proto | 1 +
.../java/org/apache/ratis/server/RaftServer.java | 9 +++++-
.../apache/ratis/server/impl/LeaderElection.java | 2 +-
.../apache/ratis/server/impl/LeaderStateImpl.java | 6 ++--
.../ratis/server/impl/PeerConfiguration.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 25 ++++++++++------
.../org/apache/ratis/server/impl/ServerState.java | 11 +++++++-
.../server/impl/SnapshotInstallationHandler.java | 4 +--
.../apache/ratis/server/raftlog/LogProtoUtils.java | 5 +++-
12 files changed, 81 insertions(+), 25 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index ef052d1c..a18aa5ce 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -39,7 +39,7 @@ public final class RaftGroup {
}
/** @return a group with the given id and peers. */
- public static RaftGroup valueOf(RaftGroupId groupId, Collection<RaftPeer>
peers) {
+ public static RaftGroup valueOf(RaftGroupId groupId, Iterable<RaftPeer>
peers) {
return new RaftGroup(groupId, peers);
}
@@ -53,12 +53,12 @@ public final class RaftGroup {
this.peers = Collections.emptyMap();
}
- private RaftGroup(RaftGroupId groupId, Collection<RaftPeer> peers) {
+ private RaftGroup(RaftGroupId groupId, Iterable<RaftPeer> peers) {
this.groupId = Objects.requireNonNull(groupId, "groupId == null");
Preconditions.assertTrue(!groupId.equals(EMPTY_GROUP.getGroupId()),
() -> "Group Id " + groupId + " is reserved for the empty group.");
- if (peers == null || peers.isEmpty()) {
+ if (peers == null || !peers.iterator().hasNext()) {
this.peers = Collections.emptyMap();
} else {
final Map<RaftPeerId, RaftPeer> map = new HashMap<>();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index e35efa8f..b3cea81a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
@@ -65,7 +66,8 @@ public final class RaftPeer {
.setAdminAddress(peer.getAdminAddress())
.setClientAddress(peer.getClientAddress())
.setDataStreamAddress(peer.getDataStreamAddress())
- .setPriority(peer.getPriority());
+ .setPriority(peer.getPriority())
+ .setStartupRole(peer.getStartupRole());
}
public static class Builder {
@@ -75,6 +77,7 @@ public final class RaftPeer {
private String clientAddress;
private String dataStreamAddress;
private int priority;
+ private RaftPeerRole startupRole = RaftPeerRole.FOLLOWER;
public Builder setId(RaftPeerId id) {
this.id = id;
@@ -133,10 +136,21 @@ public final class RaftPeer {
return this;
}
+ public Builder setStartupRole(RaftPeerRole startupRole) {
+ if (startupRole != RaftPeerRole.FOLLOWER
+ && startupRole != RaftPeerRole.LISTENER) {
+ throw new IllegalArgumentException(
+ "At startup the role can only be set to FOLLOWER or LISTENER, the
current value is " +
+ startupRole);
+ }
+ this.startupRole = startupRole;
+ return this;
+ }
+
public RaftPeer build() {
return new RaftPeer(
Objects.requireNonNull(id, "The 'id' field is not initialized."),
- address, adminAddress, clientAddress, dataStreamAddress, priority);
+ address, adminAddress, clientAddress, dataStreamAddress, priority,
startupRole);
}
}
@@ -151,17 +165,20 @@ public final class RaftPeer {
/** The priority of the peer. */
private final int priority;
+ private final RaftPeerRole startupRole;
+
private final Supplier<RaftPeerProto> raftPeerProto;
private RaftPeer(RaftPeerId id,
String address, String adminAddress, String clientAddress, String
dataStreamAddress,
- int priority) {
+ int priority, RaftPeerRole startupRole) {
this.id = Objects.requireNonNull(id, "id == null");
this.address = address;
this.dataStreamAddress = dataStreamAddress;
this.adminAddress = adminAddress;
this.clientAddress = clientAddress;
this.priority = priority;
+ this.startupRole = startupRole;
this.raftPeerProto = JavaUtils.memoize(this::buildRaftPeerProto);
}
@@ -173,6 +190,7 @@ public final class RaftPeer {
Optional.ofNullable(getClientAddress()).ifPresent(builder::setClientAddress);
Optional.ofNullable(getAdminAddress()).ifPresent(builder::setAdminAddress);
builder.setPriority(priority);
+ builder.setStartupRole(startupRole);
return builder.build();
}
@@ -208,6 +226,10 @@ public final class RaftPeer {
return priority;
}
+ public RaftPeerRole getStartupRole() {
+ return startupRole;
+ }
+
public RaftPeerProto getRaftPeerProto() {
return raftPeerProto.get();
}
@@ -220,8 +242,9 @@ public final class RaftPeer {
final String client = clientAddress != null && !Objects.equals(address,
clientAddress)
? "|client:" + clientAddress : "";
final String data = dataStreamAddress != null? "|dataStream:" +
dataStreamAddress: "";
- final String p = "|priority:" + priority;
- return id + rpc + admin + client + data + p;
+ final String p = "|priority:" + priority;
+ final String role = "|startupRole:" + startupRole;
+ return id + rpc + admin + client + data + p + role;
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 80348ff7..e57ae552 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RouteProto;
@@ -118,6 +119,7 @@ public interface ProtoUtils {
.setClientAddress(p.getClientAddress())
.setAdminAddress(p.getAdminAddress())
.setPriority(p.getPriority())
+ .setStartupRole(p.hasStartupRole() ? p.getStartupRole() :
RaftProtos.RaftPeerRole.FOLLOWER)
.build();
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index f6b38c14..fa3d15e2 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -28,6 +28,7 @@ message RaftPeerProto {
string dataStreamAddress = 4; // address of the data stream server
string clientAddress = 5; // address of the client RPC server
string adminAddress = 6; // address of the admin RPC server
+ optional RaftPeerRole startupRole = 7; // peer start up role
}
message RaftPeerIdProto {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index 70e4dff5..bdf4c0b4 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
@@ -29,6 +30,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.com.google.common.collect.Iterables;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.ReflectionUtils;
@@ -77,7 +79,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the {@link RaftGroup} for this division. */
default RaftGroup getGroup() {
- return RaftGroup.valueOf(getMemberId().getGroupId(),
getRaftConf().getAllPeers());
+ Collection<RaftPeer> allFollowerPeers =
+ getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER);
+ Collection<RaftPeer> allListenerPeers =
+ getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+ Iterable<RaftPeer> peers = Iterables.concat(allFollowerPeers,
allListenerPeers);
+ return RaftGroup.valueOf(getMemberId().getGroupId(), peers);
}
/** @return the current {@link RaftConfiguration} for this division. */
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index a6d9b6d1..73d8c0cd 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -329,7 +329,7 @@ class LeaderElection implements Runnable {
case REJECTED:
case DISCOVERED_A_NEW_TERM:
final long term = r.maxTerm(server.getState().getCurrentTerm());
- server.changeToFollowerAndPersistMetadata(term, r);
+ server.changeToFollowerAndPersistMetadata(term, false, r);
return false;
default: throw new IllegalArgumentException("Unable to process
result " + r.result);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 648a0eb3..101c1343 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -558,7 +558,7 @@ class LeaderStateImpl implements LeaderState {
private void stepDown(long term, StepDownReason reason) {
try {
- server.changeToFollowerAndPersistMetadata(term, reason);
+ server.changeToFollowerAndPersistMetadata(term, false, reason);
pendingStepDown.complete(server::newSuccessReply);
} catch(IOException e) {
final String s = this + ": Failed to persist metadata for term " + term;
@@ -953,7 +953,9 @@ class LeaderStateImpl implements LeaderState {
final RaftPeerId followerID = followerInfo.getPeer().getId();
final RaftPeer follower = conf.getPeer(followerID);
if (follower == null) {
- LOG.error("{} the follower {} is not in the conf {}", this,
server.getId(), conf);
+ if (conf.getPeer(followerID, RaftPeerRole.LISTENER) == null) {
+ LOG.error("{} the follower {} is not in the conf {}", this,
followerID, conf);
+ }
continue;
}
final int followerPriority = follower.getPriority();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 83effa84..172081a6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -97,7 +97,7 @@ class PeerConfiguration {
@Override
public String toString() {
- return peers.values().toString();
+ return "peers:" + peers.values() + "|listeners:" + listeners.values();
}
RaftPeer getPeer(RaftPeerId id, RaftPeerRole... roles) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a24e25d8..0528080a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -494,14 +494,18 @@ class RaftServerImpl implements RaftServer.Division,
* @param force Force to start a new {@link FollowerState} even if this
server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
- private synchronized boolean changeToFollower(long newTerm, boolean force,
Object reason) {
+ private synchronized boolean changeToFollower(
+ long newTerm,
+ boolean force,
+ boolean allowListener,
+ Object reason) {
final RaftPeerRole old = role.getCurrentRole();
- if (old == RaftPeerRole.LISTENER) {
+ final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
+ if (old == RaftPeerRole.LISTENER && !allowListener) {
throw new IllegalStateException("Unexpected role " + old);
}
- final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
- if (old != RaftPeerRole.FOLLOWER || force) {
+ if ((old != RaftPeerRole.FOLLOWER || force) && old !=
RaftPeerRole.LISTENER) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
role.shutdownLeaderState(false);
@@ -515,8 +519,11 @@ class RaftServerImpl implements RaftServer.Division,
return metadataUpdated;
}
- synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object
reason) throws IOException {
- if (changeToFollower(newTerm, false, reason)) {
+ synchronized void changeToFollowerAndPersistMetadata(
+ long newTerm,
+ boolean allowListener,
+ Object reason) throws IOException {
+ if (changeToFollower(newTerm, false, allowListener, reason)) {
state.persistMetadata();
}
}
@@ -571,6 +578,7 @@ class RaftServerImpl implements RaftServer.Division,
roleInfo.setCandidateInfo(candidate);
break;
+ case LISTENER:
case FOLLOWER:
final Optional<FollowerState> fs = role.getFollowerState();
final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
@@ -1188,7 +1196,8 @@ class RaftServerImpl implements RaftServer.Division,
final boolean voteGranted = context.decideVote(candidate,
candidateLastEntry);
if (candidate != null && phase == Phase.ELECTION) {
// change server state in the ELECTION phase
- final boolean termUpdated = changeToFollower(candidateTerm, true,
"candidate:" + candidateId);
+ final boolean termUpdated =
+ changeToFollower(candidateTerm, true, false, "candidate:" +
candidateId);
if (voteGranted) {
state.grantVote(candidate.getId());
}
@@ -1346,7 +1355,7 @@ class RaftServerImpl implements RaftServer.Division,
return CompletableFuture.completedFuture(reply);
}
try {
- changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries");
+ changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 212b6934..52aedfb1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
import static org.apache.ratis.server.RaftServer.Division.LOG;
@@ -104,7 +105,15 @@ class ServerState implements Closeable {
throws IOException {
this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.server = server;
- final RaftConfigurationImpl initialConf =
RaftConfigurationImpl.newBuilder().setConf(group.getPeers()).build();
+ Collection<RaftPeer> followerPeers = group.getPeers().stream()
+ .filter(peer -> peer.getStartupRole() == RaftPeerRole.FOLLOWER)
+ .collect(Collectors.toList());
+ Collection<RaftPeer> listenerPeers = group.getPeers().stream()
+ .filter(peer -> peer.getStartupRole() == RaftPeerRole.LISTENER)
+ .collect(Collectors.toList());
+ final RaftConfigurationImpl initialConf =
RaftConfigurationImpl.newBuilder()
+ .setConf(followerPeers, listenerPeers)
+ .build();
configurationManager = new ConfigurationManager(initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index afe6e8fa..18f3b542 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -165,7 +165,7 @@ class SnapshotInstallationHandler {
LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.",
getMemberId());
return reply;
}
- server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+ server.changeToFollowerAndPersistMetadata(leaderTerm, true,
"installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
@@ -212,7 +212,7 @@ class SnapshotInstallationHandler {
LOG.warn("{}: Failed to recognize leader for installSnapshot
notification.", getMemberId());
return reply;
}
- server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+ server.changeToFollowerAndPersistMetadata(leaderTerm, true,
"installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index 849dda5f..86d6fc61 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -85,7 +85,10 @@ public final class LogProtoUtils {
private static RaftConfigurationProto.Builder
toRaftConfigurationProtoBuilder(RaftConfiguration conf) {
return RaftConfigurationProto.newBuilder()
.addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers()))
- .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers()));
+
.addAllListeners(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers(RaftPeerRole.LISTENER)))
+ .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers()))
+ .addAllOldListeners(
+
ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers(RaftPeerRole.LISTENER)));
}
public static LogEntryProto toLogEntryProto(StateMachineLogEntryProto proto,
long term, long index) {