This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 2e0adeb83523485584e41098abf6fa62925366ed Author: qian0817 <[email protected]> AuthorDate: Fri Mar 31 16:20:16 2023 +0800 RATIS-1824. Membership change may fail when a Listener is present in the cluster. (#865) --- .../apache/ratis/server/impl/FollowerInfoImpl.java | 16 ++++++------- .../apache/ratis/server/impl/LeaderStateImpl.java | 24 ++++++++++---------- .../ratis/server/impl/LeaderElectionTests.java | 26 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 20 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java index 891a01c76..245cbc888 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java @@ -43,11 +43,11 @@ class FollowerInfoImpl implements FollowerInfo { private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX); private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX); private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L); - private volatile boolean attendVote; + private volatile boolean caughtUp; private volatile boolean ackInstallSnapshotAttempt = false; FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Function<RaftPeerId, RaftPeer> getPeer, - Timestamp lastRpcTime, long nextIndex, boolean attendVote) { + Timestamp lastRpcTime, long nextIndex, boolean caughtUp) { this.name = id + "->" + peer.getId(); this.infoIndexChange = s -> LOG.info("{}: {}", name, s); this.debugIndexChange = s -> LOG.debug("{}: {}", name, s); @@ -58,7 +58,7 @@ class FollowerInfoImpl implements FollowerInfo { this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime); this.nextIndex = new RaftLogIndex("nextIndex", nextIndex); - this.attendVote = attendVote; + this.caughtUp = caughtUp; } @Override @@ -140,17 +140,17 @@ class FollowerInfoImpl implements FollowerInfo { @Override public String toString() { return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" + getNextIndex() - + ", attendVote=" + attendVote + + + ", caughtUp=" + caughtUp + ", lastRpcSendTime=" + lastRpcSendTime.get().elapsedTimeMs() + ", lastRpcResponseTime=" + lastRpcResponseTime.get().elapsedTimeMs() + ")"; } - void startAttendVote() { - attendVote = true; + void catchUp() { + caughtUp = true; } - boolean isAttendingVote() { - return attendVote; + boolean isCaughtUp() { + return caughtUp; } @Override diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 6627d8e7b..7ea4d738d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -364,7 +364,7 @@ class LeaderStateImpl implements LeaderState { final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER); if (!listeners.isEmpty()) { - addSenders(listeners, placeHolderIndex, false); + addSenders(listeners, placeHolderIndex, true); } } @@ -429,7 +429,7 @@ class LeaderStateImpl implements LeaderState { @Override public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) { - if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) { + if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) { submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM); return true; } @@ -561,7 +561,7 @@ class LeaderStateImpl implements LeaderState { @Override public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower, List<LogEntryProto> entries, TermIndex previous, long callId) { - final boolean initializing = isAttendingVote(follower); + final boolean initializing = isCaughtUp(follower); final RaftPeerId targetId = follower.getId(); return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries, ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()), @@ -581,10 +581,10 @@ class LeaderStateImpl implements LeaderState { return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER); } - private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) { + private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean caughtUp) { final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs()); final List<LogAppender> newAppenders = newPeers.stream().map(peer -> { - final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote); + final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, caughtUp); followerInfoMap.put(peer.getId(), f); raftServerMetrics.addFollower(peer.getId()); logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime); @@ -726,7 +726,7 @@ class LeaderStateImpl implements LeaderState { * 3. Otherwise the peer is making progressing. Keep waiting. */ private BootStrapProgress checkProgress(FollowerInfo follower, long committed) { - Preconditions.assertTrue(!isAttendingVote(follower)); + Preconditions.assertTrue(!isCaughtUp(follower)); final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs()); final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs()); if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) { @@ -744,7 +744,7 @@ class LeaderStateImpl implements LeaderState { @Override public void onFollowerSuccessAppendEntries(FollowerInfo follower) { - if (isAttendingVote(follower)) { + if (isCaughtUp(follower)) { submitUpdateCommitEvent(); } else { eventQueue.submit(checkStagingEvent); @@ -766,7 +766,7 @@ class LeaderStateImpl implements LeaderState { // check progress for the new followers final EnumSet<BootStrapProgress> reports = getLogAppenders() .map(LogAppender::getFollower) - .filter(follower -> !isAttendingVote(follower)) + .filter(follower -> !isCaughtUp(follower)) .map(follower -> checkProgress(follower, commitIndex)) .collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class))); if (reports.contains(BootStrapProgress.NOPROGRESS)) { @@ -778,7 +778,7 @@ class LeaderStateImpl implements LeaderState { .map(LogAppender::getFollower) .filter(f -> server.getRaftConf().containsInConf(f.getId())) .map(FollowerInfoImpl.class::cast) - .forEach(FollowerInfoImpl::startAttendVote); + .forEach(FollowerInfoImpl::catchUp); } } } @@ -1184,7 +1184,7 @@ class LeaderStateImpl implements LeaderState { void fail(BootStrapProgress progress) { final String message = this + ": Fail to set configuration " + newConf + " due to " + progress; LOG.debug(message); - stopAndRemoveSenders(s -> !isAttendingVote(s.getFollower())); + stopAndRemoveSenders(s -> !isCaughtUp(s.getFollower())); stagingState = null; // send back failure response to client's request @@ -1214,8 +1214,8 @@ class LeaderStateImpl implements LeaderState { return getLogAppenders().filter(a -> a.getFollowerId().equals(id)).findAny(); } - private static boolean isAttendingVote(FollowerInfo follower) { - return ((FollowerInfoImpl)follower).isAttendingVote(); + private static boolean isCaughtUp(FollowerInfo follower) { + return ((FollowerInfoImpl)follower).isCaughtUp(); } @Override diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 0ea08da3e..cb23d6578 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -409,6 +409,32 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> } } + @Test + public void testAddFollowerWhenExistsListener() throws Exception { + try (final MiniRaftCluster cluster = newCluster(3, 1)) { + cluster.start(); + final RaftServer.Division leader = waitForLeader(cluster); + try (RaftClient client = cluster.createClient(leader.getId())) { + client.io().send(new RaftTestUtil.SimpleMessage("message")); + List<RaftPeer> servers = cluster.getPeers(); + Assert.assertEquals(4, servers.size()); + List<RaftPeer> listener = new ArrayList<>( + leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER)); + Assert.assertEquals(1, listener.size()); + MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1, true, false); + ArrayList<RaftPeer> newPeers = new ArrayList<>(Arrays.asList(changes.newPeers)); + newPeers.addAll(leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER)); + RaftClientReply reply = client.admin().setConfiguration(newPeers, listener); + Assert.assertTrue(reply.isSuccess()); + Assert.assertEquals(4, + leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER).size()); + Assert.assertEquals(1, + leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size()); + } + cluster.shutdown(); + } + } + @Test public void testRemoveListener() throws Exception { try(final MiniRaftCluster cluster = newCluster(3,1)) {
