This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 2c00073641a2d4a95fa114f6874ee898bae5f9ca
Author: qian0817 <[email protected]>
AuthorDate: Fri Jul 15 08:47:03 2022 +0800

    RATIS-1612. Support starting a server as a Listener. (#673)
    
    (cherry picked from commit 43d0275ac9515924468c24e528d4226dc7b79190)
---
 .../java/org/apache/ratis/protocol/RaftGroup.java  |  6 ++--
 .../java/org/apache/ratis/protocol/RaftPeer.java   | 33 ++++++++++++++++++----
 .../java/org/apache/ratis/util/ProtoUtils.java     |  2 ++
 ratis-proto/src/main/proto/Raft.proto              |  1 +
 .../java/org/apache/ratis/server/RaftServer.java   |  9 +++++-
 .../apache/ratis/server/impl/LeaderElection.java   |  2 +-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  6 ++--
 .../ratis/server/impl/PeerConfiguration.java       |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 25 ++++++++++------
 .../org/apache/ratis/server/impl/ServerState.java  | 11 +++++++-
 .../server/impl/SnapshotInstallationHandler.java   |  4 +--
 .../apache/ratis/server/raftlog/LogProtoUtils.java |  5 +++-
 12 files changed, 81 insertions(+), 25 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index ef052d1c..a18aa5ce 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -39,7 +39,7 @@ public final class RaftGroup {
   }
 
   /** @return a group with the given id and peers. */
-  public static RaftGroup valueOf(RaftGroupId groupId, Collection<RaftPeer> 
peers) {
+  public static RaftGroup valueOf(RaftGroupId groupId, Iterable<RaftPeer> 
peers) {
     return new RaftGroup(groupId, peers);
   }
 
@@ -53,12 +53,12 @@ public final class RaftGroup {
     this.peers = Collections.emptyMap();
   }
 
-  private RaftGroup(RaftGroupId groupId, Collection<RaftPeer> peers) {
+  private RaftGroup(RaftGroupId groupId, Iterable<RaftPeer> peers) {
     this.groupId = Objects.requireNonNull(groupId, "groupId == null");
     Preconditions.assertTrue(!groupId.equals(EMPTY_GROUP.getGroupId()),
         () -> "Group Id " + groupId + " is reserved for the empty group.");
 
-    if (peers == null || peers.isEmpty()) {
+    if (peers == null || !peers.iterator().hasNext()) {
       this.peers = Collections.emptyMap();
     } else {
       final Map<RaftPeerId, RaftPeer> map = new HashMap<>();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
index e35efa8f..b3cea81a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
@@ -65,7 +66,8 @@ public final class RaftPeer {
         .setAdminAddress(peer.getAdminAddress())
         .setClientAddress(peer.getClientAddress())
         .setDataStreamAddress(peer.getDataStreamAddress())
-        .setPriority(peer.getPriority());
+        .setPriority(peer.getPriority())
+        .setStartupRole(peer.getStartupRole());
   }
 
   public static class Builder {
@@ -75,6 +77,7 @@ public final class RaftPeer {
     private String clientAddress;
     private String dataStreamAddress;
     private int priority;
+    private RaftPeerRole startupRole = RaftPeerRole.FOLLOWER;
 
     public Builder setId(RaftPeerId id) {
       this.id = id;
@@ -133,10 +136,21 @@ public final class RaftPeer {
       return this;
     }
 
+    public Builder setStartupRole(RaftPeerRole startupRole) {
+      if (startupRole != RaftPeerRole.FOLLOWER
+          && startupRole != RaftPeerRole.LISTENER) {
+        throw new IllegalArgumentException(
+            "At startup the role can only be set to FOLLOWER or LISTENER, the 
current value is " +
+                startupRole);
+      }
+      this.startupRole = startupRole;
+      return this;
+    }
+
     public RaftPeer build() {
       return new RaftPeer(
           Objects.requireNonNull(id, "The 'id' field is not initialized."),
-          address, adminAddress, clientAddress, dataStreamAddress, priority);
+          address, adminAddress, clientAddress, dataStreamAddress, priority, 
startupRole);
     }
   }
 
@@ -151,17 +165,20 @@ public final class RaftPeer {
   /** The priority of the peer. */
   private final int priority;
 
+  private final RaftPeerRole startupRole;
+
   private final Supplier<RaftPeerProto> raftPeerProto;
 
   private RaftPeer(RaftPeerId id,
       String address, String adminAddress, String clientAddress, String 
dataStreamAddress,
-      int priority) {
+      int priority, RaftPeerRole startupRole) {
     this.id = Objects.requireNonNull(id, "id == null");
     this.address = address;
     this.dataStreamAddress = dataStreamAddress;
     this.adminAddress = adminAddress;
     this.clientAddress = clientAddress;
     this.priority = priority;
+    this.startupRole = startupRole;
     this.raftPeerProto = JavaUtils.memoize(this::buildRaftPeerProto);
   }
 
@@ -173,6 +190,7 @@ public final class RaftPeer {
     
Optional.ofNullable(getClientAddress()).ifPresent(builder::setClientAddress);
     Optional.ofNullable(getAdminAddress()).ifPresent(builder::setAdminAddress);
     builder.setPriority(priority);
+    builder.setStartupRole(startupRole);
     return builder.build();
   }
 
@@ -208,6 +226,10 @@ public final class RaftPeer {
     return priority;
   }
 
+  public RaftPeerRole getStartupRole() {
+    return startupRole;
+  }
+
   public RaftPeerProto getRaftPeerProto() {
     return raftPeerProto.get();
   }
@@ -220,8 +242,9 @@ public final class RaftPeer {
     final String client = clientAddress != null && !Objects.equals(address, 
clientAddress)
         ? "|client:" + clientAddress : "";
     final String data = dataStreamAddress != null? "|dataStream:" + 
dataStreamAddress: "";
-    final String p = "|priority:" +  priority;
-    return id + rpc + admin + client + data + p;
+    final String p = "|priority:" + priority;
+    final String role = "|startupRole:" + startupRole;
+    return id + rpc + admin + client + data + p + role;
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 80348ff7..e57ae552 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.RaftPeerIdProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.RouteProto;
@@ -118,6 +119,7 @@ public interface ProtoUtils {
         .setClientAddress(p.getClientAddress())
         .setAdminAddress(p.getAdminAddress())
         .setPriority(p.getPriority())
+        .setStartupRole(p.hasStartupRole() ? p.getStartupRole() : 
RaftProtos.RaftPeerRole.FOLLOWER)
         .build();
   }
 
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index f6b38c14..fa3d15e2 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -28,6 +28,7 @@ message RaftPeerProto {
   string dataStreamAddress = 4; // address of the data stream server
   string clientAddress = 5; // address of the client RPC server
   string adminAddress = 6; // address of the admin RPC server
+  optional RaftPeerRole startupRole = 7; // peer start up role
 }
 
 message RaftPeerIdProto {
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index 70e4dff5..bdf4c0b4 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
@@ -29,6 +30,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.thirdparty.com.google.common.collect.Iterables;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.ReflectionUtils;
@@ -77,7 +79,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
 
     /** @return the {@link RaftGroup} for this division. */
     default RaftGroup getGroup() {
-      return RaftGroup.valueOf(getMemberId().getGroupId(), 
getRaftConf().getAllPeers());
+      Collection<RaftPeer> allFollowerPeers =
+          getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER);
+      Collection<RaftPeer> allListenerPeers =
+          getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+      Iterable<RaftPeer> peers = Iterables.concat(allFollowerPeers, 
allListenerPeers);
+      return RaftGroup.valueOf(getMemberId().getGroupId(), peers);
     }
 
     /** @return the current {@link RaftConfiguration} for this division. */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index a6d9b6d1..73d8c0cd 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -329,7 +329,7 @@ class LeaderElection implements Runnable {
           case REJECTED:
           case DISCOVERED_A_NEW_TERM:
             final long term = r.maxTerm(server.getState().getCurrentTerm());
-            server.changeToFollowerAndPersistMetadata(term, r);
+            server.changeToFollowerAndPersistMetadata(term, false, r);
             return false;
           default: throw new IllegalArgumentException("Unable to process 
result " + r.result);
         }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 648a0eb3..101c1343 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -558,7 +558,7 @@ class LeaderStateImpl implements LeaderState {
 
   private void stepDown(long term, StepDownReason reason) {
     try {
-      server.changeToFollowerAndPersistMetadata(term, reason);
+      server.changeToFollowerAndPersistMetadata(term, false, reason);
       pendingStepDown.complete(server::newSuccessReply);
     } catch(IOException e) {
       final String s = this + ": Failed to persist metadata for term " + term;
@@ -953,7 +953,9 @@ class LeaderStateImpl implements LeaderState {
       final RaftPeerId followerID = followerInfo.getPeer().getId();
       final RaftPeer follower = conf.getPeer(followerID);
       if (follower == null) {
-        LOG.error("{} the follower {} is not in the conf {}", this, 
server.getId(), conf);
+        if (conf.getPeer(followerID, RaftPeerRole.LISTENER) == null) {
+          LOG.error("{} the follower {} is not in the conf {}", this, 
followerID, conf);
+        }
         continue;
       }
       final int followerPriority = follower.getPriority();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 83effa84..172081a6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -97,7 +97,7 @@ class PeerConfiguration {
 
   @Override
   public String toString() {
-    return peers.values().toString();
+    return "peers:" + peers.values() + "|listeners:" + listeners.values();
   }
 
   RaftPeer getPeer(RaftPeerId id, RaftPeerRole... roles) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a24e25d8..0528080a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -494,14 +494,18 @@ class RaftServerImpl implements RaftServer.Division,
    * @param force Force to start a new {@link FollowerState} even if this 
server is already a follower.
    * @return if the term/votedFor should be updated to the new term
    */
-  private synchronized boolean changeToFollower(long newTerm, boolean force, 
Object reason) {
+  private synchronized boolean changeToFollower(
+      long newTerm,
+      boolean force,
+      boolean allowListener,
+      Object reason) {
     final RaftPeerRole old = role.getCurrentRole();
-    if (old == RaftPeerRole.LISTENER) {
+    final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
+    if (old == RaftPeerRole.LISTENER && !allowListener) {
       throw new IllegalStateException("Unexpected role " + old);
     }
-    final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
 
-    if (old != RaftPeerRole.FOLLOWER || force) {
+    if ((old != RaftPeerRole.FOLLOWER || force) && old != 
RaftPeerRole.LISTENER) {
       setRole(RaftPeerRole.FOLLOWER, reason);
       if (old == RaftPeerRole.LEADER) {
         role.shutdownLeaderState(false);
@@ -515,8 +519,11 @@ class RaftServerImpl implements RaftServer.Division,
     return metadataUpdated;
   }
 
-  synchronized void changeToFollowerAndPersistMetadata(long newTerm, Object 
reason) throws IOException {
-    if (changeToFollower(newTerm, false, reason)) {
+  synchronized void changeToFollowerAndPersistMetadata(
+      long newTerm,
+      boolean allowListener,
+      Object reason) throws IOException {
+    if (changeToFollower(newTerm, false, allowListener, reason)) {
       state.persistMetadata();
     }
   }
@@ -571,6 +578,7 @@ class RaftServerImpl implements RaftServer.Division,
       roleInfo.setCandidateInfo(candidate);
       break;
 
+    case LISTENER:
     case FOLLOWER:
       final Optional<FollowerState> fs = role.getFollowerState();
       final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
@@ -1188,7 +1196,8 @@ class RaftServerImpl implements RaftServer.Division,
       final boolean voteGranted = context.decideVote(candidate, 
candidateLastEntry);
       if (candidate != null && phase == Phase.ELECTION) {
         // change server state in the ELECTION phase
-        final boolean termUpdated = changeToFollower(candidateTerm, true, 
"candidate:" + candidateId);
+        final boolean termUpdated =
+            changeToFollower(candidateTerm, true, false, "candidate:" + 
candidateId);
         if (voteGranted) {
           state.grantVote(candidate.getId());
         }
@@ -1346,7 +1355,7 @@ class RaftServerImpl implements RaftServer.Division,
         return CompletableFuture.completedFuture(reply);
       }
       try {
-        changeToFollowerAndPersistMetadata(leaderTerm, "appendEntries");
+        changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
       } catch (IOException e) {
         return JavaUtils.completeExceptionally(e);
       }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 212b6934..52aedfb1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
+import java.util.stream.Collectors;
 
 import static org.apache.ratis.server.RaftServer.Division.LOG;
 
@@ -104,7 +105,15 @@ class ServerState implements Closeable {
       throws IOException {
     this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
     this.server = server;
-    final RaftConfigurationImpl initialConf = 
RaftConfigurationImpl.newBuilder().setConf(group.getPeers()).build();
+    Collection<RaftPeer> followerPeers = group.getPeers().stream()
+        .filter(peer -> peer.getStartupRole() == RaftPeerRole.FOLLOWER)
+        .collect(Collectors.toList());
+    Collection<RaftPeer> listenerPeers = group.getPeers().stream()
+        .filter(peer -> peer.getStartupRole() == RaftPeerRole.LISTENER)
+        .collect(Collectors.toList());
+    final RaftConfigurationImpl initialConf = 
RaftConfigurationImpl.newBuilder()
+        .setConf(followerPeers, listenerPeers)
+        .build();
     configurationManager = new ConfigurationManager(initialConf);
     LOG.info("{}: {}", getMemberId(), configurationManager);
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index afe6e8fa..18f3b542 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -165,7 +165,7 @@ class SnapshotInstallationHandler {
         LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", 
getMemberId());
         return reply;
       }
-      server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+      server.changeToFollowerAndPersistMetadata(leaderTerm, true, 
"installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
 
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
@@ -212,7 +212,7 @@ class SnapshotInstallationHandler {
         LOG.warn("{}: Failed to recognize leader for installSnapshot 
notification.", getMemberId());
         return reply;
       }
-      server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+      server.changeToFollowerAndPersistMetadata(leaderTerm, true, 
"installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index 849dda5f..86d6fc61 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -85,7 +85,10 @@ public final class LogProtoUtils {
   private static RaftConfigurationProto.Builder 
toRaftConfigurationProtoBuilder(RaftConfiguration conf) {
     return RaftConfigurationProto.newBuilder()
         .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers()))
-        .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers()));
+        
.addAllListeners(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers(RaftPeerRole.LISTENER)))
+        .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers()))
+        .addAllOldListeners(
+            
ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers(RaftPeerRole.LISTENER)));
   }
 
   public static LogEntryProto toLogEntryProto(StateMachineLogEntryProto proto, 
long term, long index) {

Reply via email to