This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 3923d64bf02c0e2fbc0915dae5fcc0ea9b12204e Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Tue Dec 6 05:00:47 2022 -0800 RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789) (cherry picked from commit 1c00461b93a2d259bf810713b00a9791a6bd292d) --- .../apache/ratis/server/impl/LeaderStateImpl.java | 119 +++++++++++++-------- .../ratis/server/impl/PeerConfiguration.java | 7 +- .../ratis/server/impl/RaftConfigurationImpl.java | 11 +- 3 files changed, 90 insertions(+), 47 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 4a9c07bee..b55389343 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -225,6 +225,72 @@ class LeaderStateImpl implements LeaderState { } } + /** For caching {@link FollowerInfo}s. This class is immutable. */ + static class CurrentOldFollowerInfos { + private final RaftConfigurationImpl conf; + private final List<FollowerInfo> current; + private final List<FollowerInfo> old; + + CurrentOldFollowerInfos(RaftConfigurationImpl conf, List<FollowerInfo> current, List<FollowerInfo> old) { + // set null when the sizes are not the same so that it will update next time. + this.conf = isSameSize(current, conf.getConf()) && isSameSize(old, conf.getOldConf())? conf: null; + this.current = Collections.unmodifiableList(current); + this.old = old == null? null: Collections.unmodifiableList(old); + } + + RaftConfigurationImpl getConf() { + return conf; + } + + List<FollowerInfo> getCurrent() { + return current; + } + + List<FollowerInfo> getOld() { + return old; + } + } + + static boolean isSameSize(List<FollowerInfo> infos, PeerConfiguration conf) { + return conf == null? infos == null: conf.size() == infos.size(); + } + + /** Use == to compare if the confs are the same object. */ + static boolean isSameConf(CurrentOldFollowerInfos cached, RaftConfigurationImpl conf) { + return cached != null && cached.getConf() == conf; + } + + static class FollowerInfoMap { + private final Map<RaftPeerId, FollowerInfo> map = new ConcurrentHashMap<>(); + + private volatile CurrentOldFollowerInfos followerInfos; + + void put(RaftPeerId id, FollowerInfo info) { + map.put(id, info); + } + + CurrentOldFollowerInfos getFollowerInfos(RaftConfigurationImpl conf) { + final CurrentOldFollowerInfos cached = followerInfos; + if (isSameConf(cached, conf)) { + return cached; + } + + return update(conf); + } + + synchronized CurrentOldFollowerInfos update(RaftConfigurationImpl conf) { + if (!isSameConf(followerInfos, conf)) { // compare again synchronized + followerInfos = new CurrentOldFollowerInfos(conf, getFollowerInfos(conf.getConf()), + Optional.ofNullable(conf.getOldConf()).map(this::getFollowerInfos).orElse(null)); + } + return followerInfos; + } + + private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) { + return peers.streamPeerIds().map(map::get).filter(Objects::nonNull).collect(Collectors.toList()); + } + } + private final StateUpdateEvent updateCommitEvent = new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit); private final StateUpdateEvent checkStagingEvent = @@ -235,8 +301,8 @@ class LeaderStateImpl implements LeaderState { private final RaftLog raftLog; private final long currentTerm; private volatile ConfigurationStagingState stagingState; - private List<List<RaftPeerId>> voterLists; - private final Map<RaftPeerId, FollowerInfo> peerIdFollowerInfoMap = new ConcurrentHashMap<>(); + + private final FollowerInfoMap followerInfoMap = new FollowerInfoMap(); /** * The list of threads appending entries to followers. @@ -299,7 +365,6 @@ class LeaderStateImpl implements LeaderState { if (!listeners.isEmpty()) { addSenders(listeners, placeHolderIndex, false, RaftPeerRole.LISTENER); } - voterLists = divideFollowers(conf); } LogEntryProto start() { @@ -472,7 +537,6 @@ class LeaderStateImpl implements LeaderState { private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) { Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex()); - voterLists = divideFollowers(newConf); server.getState().setRaftConf(newConf); } @@ -508,7 +572,7 @@ class LeaderStateImpl implements LeaderState { final List<LogAppender> newAppenders = newPeers.stream() .map(peer -> { final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote); - peerIdFollowerInfoMap.put(peer.getId(), f); + followerInfoMap.put(peer.getId(), f); if (role == RaftPeerRole.FOLLOWER) { raftServerMetrics.addFollower(peer.getId()); logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime); @@ -780,7 +844,8 @@ class LeaderStateImpl implements LeaderState { final RaftPeerId selfId = server.getId(); final RaftConfigurationImpl conf = server.getRaftConf(); - final List<RaftPeerId> followers = voterLists.get(0); + final CurrentOldFollowerInfos infos = followerInfoMap.getFollowerInfos(conf); + final List<FollowerInfo> followers = infos.getCurrent(); final boolean includeSelf = conf.containsInConf(selfId); if (followers.isEmpty() && !includeSelf) { return Optional.empty(); @@ -792,7 +857,7 @@ class LeaderStateImpl implements LeaderState { if (!conf.isTransitional()) { return Optional.of(newConf); } else { // configuration is in transitional state - final List<RaftPeerId> oldFollowers = voterLists.get(1); + final List<FollowerInfo> oldFollowers = infos.getOld(); final boolean includeSelfInOldConf = conf.containsInOldConf(selfId); if (oldFollowers.isEmpty() && !includeSelfInOldConf) { return Optional.empty(); @@ -882,31 +947,14 @@ class LeaderStateImpl implements LeaderState { notifySenders(); } - private List<FollowerInfo> getFollowerInfos(List<RaftPeerId> followerIDs) { - List<FollowerInfo> followerInfos = new ArrayList<>(); - for (int i = 0; i < followerIDs.size(); i++) { - RaftPeerId id = followerIDs.get(i); - if (!peerIdFollowerInfoMap.containsKey(id)) { - throw new IllegalArgumentException("RaftPeerId:" + id + - " not in peerIdFollowerInfoMap of leader:" + server.getMemberId()); - } - - followerInfos.add(peerIdFollowerInfoMap.get(id)); - } - - return followerInfos; - } - - private long[] getSorted(List<RaftPeerId> followerIDs, boolean includeSelf, + private long[] getSorted(List<FollowerInfo> followerInfos, boolean includeSelf, ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) { - final int length = includeSelf ? followerIDs.size() + 1 : followerIDs.size(); + final int length = includeSelf ? followerInfos.size() + 1 : followerInfos.size(); if (length == 0) { - throw new IllegalArgumentException("followers.size() == " - + followerIDs.size() + " and includeSelf == " + includeSelf); + throw new IllegalArgumentException("followerInfos is empty and includeSelf == " + includeSelf); } final long[] indices = new long[length]; - List<FollowerInfo> followerInfos = getFollowerInfos(followerIDs); for (int i = 0; i < followerInfos.size(); i++) { indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i)); } @@ -920,23 +968,6 @@ class LeaderStateImpl implements LeaderState { return indices; } - private List<List<RaftPeerId>> divideFollowers(RaftConfigurationImpl conf) { - List<List<RaftPeerId>> lists = new ArrayList<>(2); - List<RaftPeerId> listForNew = senders.stream() - .map(LogAppender::getFollowerId) - .filter(conf::containsInConf) - .collect(Collectors.toList()); - lists.add(listForNew); - if (conf.isTransitional()) { - List<RaftPeerId> listForOld = senders.stream() - .map(LogAppender::getFollowerId) - .filter(conf::containsInOldConf) - .collect(Collectors.toList()); - lists.add(listForOld); - } - return lists; - } - private void yieldLeaderToHigherPriorityPeer() { if (!server.getInfo().isLeader()) { return; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java index 6730b6181..38e3602e8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java @@ -31,10 +31,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Stream; /** * The peer configuration of a raft cluster. - * + * <p> * The objects of this class are immutable. */ class PeerConfiguration { @@ -95,6 +96,10 @@ class PeerConfiguration { return peers.size(); } + Stream<RaftPeerId> streamPeerIds() { + return peers.keySet().stream(); + } + @Override public String toString() { return "peers:" + peers.values() + "|listeners:" + listeners.values(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java index 43818395a..3e53451f0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java @@ -34,10 +34,10 @@ import java.util.stream.Collectors; /** * The configuration of the raft cluster. - * + * <p> * The configuration is stable if there is no on-going peer change. Otherwise, * the configuration is transitional, i.e. in the middle of a peer change. - * + * <p> * The objects of this class are immutable. */ final class RaftConfigurationImpl implements RaftConfiguration { @@ -157,6 +157,13 @@ final class RaftConfigurationImpl implements RaftConfiguration { } } + PeerConfiguration getConf() { + return conf; + } + + PeerConfiguration getOldConf() { + return oldConf; + } boolean isHighestPriority(RaftPeerId peerId) { RaftPeer target = getPeer(peerId);
