Repository: incubator-ratis Updated Branches: refs/heads/master c3845bc3f -> 9b84d79cf
RATIS-337. In RaftServerImpl, leaderState/heartbeatMonitor may be accessed without proper null check. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/9b84d79c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/9b84d79c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/9b84d79c Branch: refs/heads/master Commit: 9b84d79cf9b305aff99b65c782ad340671a58c87 Parents: c3845bc Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Oct 8 14:48:30 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Oct 8 14:48:30 2018 +0800 ---------------------------------------------------------------------- .../ratis/server/impl/RaftServerImpl.java | 200 ++++++------------- .../org/apache/ratis/server/impl/RoleInfo.java | 82 +++++++- .../ratis/server/impl/ServerProtoUtils.java | 10 + .../apache/ratis/server/impl/ServerState.java | 9 +- .../ratis/server/impl/RaftServerTestUtil.java | 2 +- 5 files changed, 162 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 5e7bd89..2d7f85a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -45,7 +45,6 @@ import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; import static org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; @@ -73,15 +72,6 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() -> new RaftPeer(getId(), getServerRpc().getInetSocketAddress())); private final RoleInfo role; - /** used when the peer is follower, to monitor election timeout */ - private volatile FollowerState heartbeatMonitor; - - /** used when the peer is candidate, to request votes from other peers */ - private volatile LeaderElection electionDaemon; - - /** used when the peer is leader */ - private volatile LeaderState leaderState; - private final RetryCache retryCache; private final CommitInfoCache commitInfoCache = new CommitInfoCache(); @@ -93,7 +83,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou this.groupId = group.getGroupId(); this.lifeCycle = new LifeCycle(id); this.stateMachine = stateMachine; - this.role = new RoleInfo(); + this.role = new RoleInfo(id); final RaftProperties properties = proxy.getProperties(); minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS); @@ -199,7 +189,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou */ private void startAsFollower() { setRole(RaftPeerRole.FOLLOWER, "startAsFollower"); - startHeartbeatMonitor(); + role.startFollowerState(this); lifeCycle.transition(RUNNING); } @@ -210,23 +200,19 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou */ private void startInitializing() { setRole(RaftPeerRole.FOLLOWER, "startInitializing"); - // do not start heartbeatMonitoring + // do not start FollowerState } public ServerState getState() { return state; } - LeaderState getLeaderState() { - return leaderState; - } - public RaftPeerId getId() { return getState().getSelfId(); } - RaftPeerRole getRole() { - return role.getCurrentRole(); + RoleInfo getRole() { + return role; } RaftConfiguration getRaftConf() { @@ -246,19 +232,19 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou LOG.warn("Failed to un-register RaftServer JMX bean for " + getId(), ignored); } try { - shutdownHeartbeatMonitor(); + role.shutdownFollowerState(); } catch (Exception ignored) { - LOG.warn("Failed to shutdown heartbeat monitor for " + getId(), ignored); + LOG.warn("Failed to shutdown FollowerState for " + getId(), ignored); } try{ - shutdownElectionDaemon(); + role.shutdownLeaderElection(); } catch (Exception ignored) { - LOG.warn("Failed to shutdown election daemon for " + getId(), ignored); + LOG.warn("Failed to shutdown LeaderElection for " + getId(), ignored); } try{ - shutdownLeaderState(true); + role.shutdownLeaderState(true); } catch (Exception ignored) { - LOG.warn("Failed to shutdown leader state monitor for " + getId(), ignored); + LOG.warn("Failed to shutdown LeaderState monitor for " + getId(), ignored); } try{ state.close(); @@ -305,11 +291,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou if (old != RaftPeerRole.FOLLOWER) { setRole(RaftPeerRole.FOLLOWER, "changeToFollower"); if (old == RaftPeerRole.LEADER) { - shutdownLeaderState(false); + role.shutdownLeaderState(false); } else if (old == RaftPeerRole.CANDIDATE) { - shutdownElectionDaemon(); + role.shutdownLeaderElection(); } - startHeartbeatMonitor(); + role.startFollowerState(this); } return metadataUpdated; } @@ -320,55 +306,17 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } } - private synchronized void shutdownLeaderState(boolean allowNull) { - if (leaderState == null) { - if (!allowNull) { - throw new NullPointerException("leaderState == null"); - } - } else { - leaderState.stop(); - leaderState = null; - } - // TODO: make sure that StateMachineUpdater has applied all transactions that have context - } - - private void shutdownElectionDaemon() { - final LeaderElection election = electionDaemon; - if (election != null) { - election.stopRunning(); - // no need to interrupt the election thread - } - electionDaemon = null; - } - synchronized void changeToLeader() { Preconditions.assertTrue(isCandidate()); - shutdownElectionDaemon(); + role.shutdownLeaderElection(); setRole(RaftPeerRole.LEADER, "changeToLeader"); state.becomeLeader(); // start sending AppendEntries RPC to followers - leaderState = new LeaderState(this, getProxy().getProperties()); - final LogEntryProto e = leaderState.start(); + final LogEntryProto e = role.startLeaderState(this, getProxy().getProperties()); getState().setRaftConf(e.getIndex(), ServerProtoUtils.toRaftConfiguration(e)); } - private void startHeartbeatMonitor() { - Preconditions.assertTrue(heartbeatMonitor == null, "heartbeatMonitor != null"); - LOG.debug("{} starts heartbeatMonitor", getId()); - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - } - - private void shutdownHeartbeatMonitor() { - final FollowerState hm = heartbeatMonitor; - if (hm != null) { - hm.stopRunning(); - hm.interrupt(); - } - heartbeatMonitor = null; - } - Collection<CommitInfoProto> getCommitInfos() { final List<CommitInfoProto> infos = new ArrayList<>(); // add the commit info of this server @@ -376,7 +324,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou // add the commit infos of other servers if (isLeader()) { - Optional.ofNullable(leaderState).ifPresent( + role.getLeaderState().ifPresent( leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos)); } else { getRaftConf().getPeers().stream() @@ -408,22 +356,23 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou break; case FOLLOWER: - FollowerInfoProto.Builder follower = FollowerInfoProto.newBuilder() - .setLeaderInfo(getServerRpcProto( - getRaftConf().getPeer(state.getLeaderId()), - heartbeatMonitor.getLastRpcTime().elapsedTimeMs())) - .setInLogSync(heartbeatMonitor.isInLogSync()); - roleInfo.setFollowerInfo(follower); + role.getFollowerState().ifPresent(fs -> { + final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto( + getRaftConf().getPeer(state.getLeaderId()), fs.getLastRpcTime().elapsedTimeMs()); + roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder() + .setLeaderInfo(leaderInfo) + .setInLogSync(fs.isInLogSync())); + }); break; case LEADER: - LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder(); - Stream<LogAppender> stream = getLeaderState().getLogAppenders(); - stream.forEach(appender -> - leader.addFollowerInfo(getServerRpcProto( - appender.getFollower().getPeer(), - appender.getFollower().getLastRpcResponseTime().elapsedTimeMs()))); - roleInfo.setLeaderInfo(leader); + role.getLeaderState().ifPresent(ls -> { + final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder(); + ls.getLogAppenders().map(LogAppender::getFollower).forEach(f -> + leader.addFollowerInfo(ServerProtoUtils.toServerRpcProto( + f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs()))); + roleInfo.setLeaderInfo(leader); + }); break; default: @@ -432,27 +381,15 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return roleInfo.build(); } - private ServerRpcProto getServerRpcProto(RaftPeer peer, long delay) { - if (peer == null) { - // if no peer information return empty - return ServerRpcProto.getDefaultInstance(); - } - return ServerRpcProto.newBuilder() - .setId(ProtoUtils.toRaftPeerProto(peer)) - .setLastRpcElapsedTimeMs(delay) - .build(); - } - synchronized void changeToCandidate() { Preconditions.assertTrue(isFollower()); - shutdownHeartbeatMonitor(); + role.shutdownFollowerState(); setRole(RaftPeerRole.CANDIDATE, "changeToCandidate"); if (state.checkForExtendedNoLeader()) { stateMachine.notifyExtendedNoLeader(getGroup(), getRoleInfoProto()); } // start election - electionDaemon = new LeaderElection(this); - electionDaemon.start(); + role.startLeaderElection(this); } @Override @@ -476,7 +413,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou NotLeaderException exception = generateNotLeaderException(); final RaftClientReply reply = new RaftClientReply(request, exception, getCommitInfos()); return RetryCache.failWithReply(reply, entry); - } else if (leaderState == null || !leaderState.isReady()) { + } + final LeaderState leaderState = role.getLeaderState().orElse(null); + if (leaderState == null || !leaderState.isReady()) { RetryCache.CacheEntry cacheEntry = retryCache.get(request.getClientId(), request.getCallId()); if (cacheEntry != null && cacheEntry.isCompletedNormally()) { return cacheEntry.getReplyFuture(); @@ -534,6 +473,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } // append the message to its local log + final LeaderState leaderState = role.getLeaderStateNonNull(); final long entryIndex; try { entryIndex = state.applyLog(context, request.getClientId(), @@ -696,6 +636,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } final RaftConfiguration current = getRaftConf(); + final LeaderState leaderState = role.getLeaderStateNonNull(); // make sure there is no other raft reconfiguration in progress if (!current.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) { throw new ReconfigurationInProgressException( @@ -723,7 +664,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } else if (isLeader()) { return true; } else { - return isFollower() && state.hasLeader() && heartbeatMonitor.shouldWithholdVotes(); + // following a leader and not yet timeout + return isFollower() && state.hasLeader() + && role.getFollowerState().map(FollowerState::shouldWithholdVotes).orElse(false); } } @@ -742,7 +685,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou && getState().isConfCommitted() && !getRaftConf().containsInConf(candidateId) && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex() - && !leaderState.isBootStrappingPeer(candidateId); + && role.getLeaderState().map(ls -> !ls.isBootStrappingPeer(candidateId)).orElse(false); } @Override @@ -769,15 +712,16 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou boolean shouldShutdown = false; final RequestVoteReplyProto reply; synchronized (this) { + final FollowerState fs = role.getFollowerState().orElse(null); if (shouldWithholdVotes(candidateTerm)) { LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}", getId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(), - isFollower()? heartbeatMonitor.getLastRpcTime().elapsedTimeMs() + "ms": null); + fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null); } else if (state.recognizeCandidate(candidateId, candidateTerm)) { final boolean termUpdated = changeToFollower(candidateTerm); // see Section 5.4.1 Election restriction - if (state.isLogUpToDate(candidateLastEntry)) { - heartbeatMonitor.updateLastRpcTime(false); + if (state.isLogUpToDate(candidateLastEntry) && fs != null) { + fs.updateLastRpcTime(false); state.grantVote(candidateId); voteGranted = true; } @@ -864,6 +808,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } } + private void updateLastRpcTime(boolean inLogSync) { + if (lifeCycle.getCurrentState() == RUNNING) { + role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(inLogSync)); + } + } + private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing, @@ -912,11 +862,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou state.setLeader(leaderId, "appendEntries"); if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { - startHeartbeatMonitor(); - } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); + role.startFollowerState(this); } + updateLastRpcTime(true); // We need to check if "previous" is in the local peer. Note that it is // possible that "previous" is covered by the latest snapshot: e.g., @@ -950,7 +898,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou if (lifeCycle.getCurrentState() == RUNNING && isFollower() && getState().getCurrentTerm() == currentTerm) { // reset election timer to avoid punishing the leader for our own long disk writes - heartbeatMonitor.updateLastRpcTime(false); + updateLastRpcTime(false); } state.updateStatemachine(leaderCommit, currentTerm); reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), groupId, currentTerm, @@ -1006,9 +954,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou changeToFollowerAndPersistMetadata(leaderTerm); state.setLeader(leaderId, "installSnapshot"); - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(true); - } + updateLastRpcTime(true); // Check and append the snapshot chunk. We simply put this in lock // considering a follower peer requiring a snapshot installation does not @@ -1026,9 +972,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou if (request.getDone()) { state.reloadStateMachine(lastIncludedIndex, leaderTerm); } - if (lifeCycle.getCurrentState() == RUNNING) { - heartbeatMonitor.updateLastRpcTime(false); - } + updateLastRpcTime(false); } if (request.getDone()) { LOG.info("{}: successfully install the whole snapshot-{}", getId(), @@ -1056,7 +1000,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } public void submitUpdateCommitEvent() { - Optional.ofNullable(leaderState).ifPresent(LeaderState::submitUpdateCommitEvent); + role.getLeaderState().ifPresent(LeaderState::submitUpdateCommitEvent); } /** @@ -1092,6 +1036,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou // update pending request boolean updateCache = true; // always update cache for follower synchronized (RaftServerImpl.this) { + final LeaderState leaderState = role.getLeaderState().orElse(null); if (isLeader() && leaderState != null) { // is leader and is running // For leader, update cache unless the reply is delayed. // When a reply is delayed, the cache will be updated in DelayedReply.getReply(). @@ -1104,19 +1049,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou }); } - private TransactionContext getTransactionContext(long index) { - if (leaderState != null) { // is leader and is running - return leaderState.getTransactionContext(index); - } - return null; - } - public long[] getFollowerNextIndices() { - LeaderState s = this.leaderState; - if (s == null || !isLeader()) { + if (!isLeader()) { return null; } - return s.getFollowerNextIndices(); + return role.getLeaderState().map(LeaderState::getFollowerNextIndices).orElse(null); } CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) { @@ -1127,10 +1064,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou stateMachine.setRaftConfiguration(ServerProtoUtils.toRaftConfiguration(next)); } else if (next.getLogEntryBodyCase() == SMLOGENTRY) { // check whether there is a TransactionContext because we are the leader. - TransactionContext trx = getTransactionContext(next.getIndex()); - if (trx == null) { - trx = new TransactionContextImpl(getRole(), stateMachine, next); - } + TransactionContext trx = role.getLeaderState() + .map(leader -> leader.getTransactionContext(next.getIndex())).orElseGet( + () -> new TransactionContextImpl(role.getCurrentRole(), stateMachine, next)); // Let the StateMachine inject logic for committed transactions in sequential order. trx = stateMachine.applyTransactionSerial(trx); @@ -1189,12 +1125,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou @Override public List<String> getFollowers() { - return Optional.ofNullable(leaderState) - .map(leader -> - leader.getFollowers().stream() - .map(RaftPeer::toString) - .collect(Collectors.toList())) - .orElse(Collections.emptyList()); + return role.getLeaderState().map(LeaderState::getFollowers).orElse(Collections.emptyList()) + .stream().map(RaftPeer::toString).collect(Collectors.toList()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java index 9ee2aa1..eb25c71 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java @@ -18,20 +18,38 @@ package org.apache.ratis.server.impl; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.Timestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; /** * Maintain the Role of a Raft Peer. */ public class RoleInfo { + public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class); + private final RaftPeerId id; private volatile RaftPeerRole role; + /** Used when the peer is leader */ + private final AtomicReference<LeaderState> leaderState = new AtomicReference<>(); + /** Used when the peer is follower, to monitor election timeout */ + private final AtomicReference<FollowerState> followerState = new AtomicReference<>(); + /** Used when the peer is candidate, to request votes from other peers */ + private final AtomicReference<LeaderElection> leaderElection = new AtomicReference<>(); + private final AtomicReference<Timestamp> transitionTime; - RoleInfo() { + RoleInfo(RaftPeerId id) { + this.id = id; this.transitionTime = new AtomicReference<>(new Timestamp()); } @@ -60,6 +78,68 @@ public class RoleInfo { return role == RaftPeerRole.LEADER; } + Optional<LeaderState> getLeaderState() { + return Optional.ofNullable(leaderState.get()); + } + + LeaderState getLeaderStateNonNull() { + return Objects.requireNonNull(leaderState.get(), "leaderState is null"); + } + + LogEntryProto startLeaderState(RaftServerImpl server, RaftProperties properties) { + return updateAndGet(leaderState, new LeaderState(server, properties)).start(); + } + + void shutdownLeaderState(boolean allowNull) { + final LeaderState leader = leaderState.getAndSet(null); + if (leader == null) { + if (!allowNull) { + throw new NullPointerException("leaderState == null"); + } + } else { + LOG.info("{}: shutdown {}", id, leader.getClass().getSimpleName()); + leader.stop(); + } + // TODO: make sure that StateMachineUpdater has applied all transactions that have context + } + + Optional<FollowerState> getFollowerState() { + return Optional.ofNullable(followerState.get()); + } + + void startFollowerState(RaftServerImpl server) { + updateAndGet(followerState, new FollowerState(server)).start(); + } + + void shutdownFollowerState() { + final FollowerState follower = followerState.getAndSet(null); + if (follower != null) { + LOG.info("{}: shutdown {}", id, follower.getClass().getSimpleName()); + follower.stopRunning(); + follower.interrupt(); + } + } + + void startLeaderElection(RaftServerImpl server) { + updateAndGet(leaderElection, new LeaderElection(server)).start(); + } + + void shutdownLeaderElection() { + final LeaderElection election = leaderElection.getAndSet(null); + if (election != null) { + LOG.info("{}: shutdown {}", id, election.getClass().getSimpleName()); + election.stopRunning(); + // no need to interrupt the election thread + } + } + + private <T> T updateAndGet(AtomicReference<T> ref, T current) { + final T updated = ref.updateAndGet(previous -> previous != null? previous: current); + Preconditions.assertTrue(updated == current, "previous != null"); + LOG.info("{}: start {}", id, current.getClass().getSimpleName()); + return updated; + } + @Override public String toString() { return "" + role; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 53df265..6fbd43a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -211,4 +211,14 @@ public class ServerProtoUtils { return b.build(); } + static ServerRpcProto toServerRpcProto(RaftPeer peer, long delay) { + if (peer == null) { + // if no peer information return empty + return ServerRpcProto.getDefaultInstance(); + } + return ServerRpcProto.newBuilder() + .setId(ProtoUtils.toRaftPeerProto(peer)) + .setLastRpcElapsedTimeMs(delay) + .build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 86b6b00..10afbfd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -35,7 +35,6 @@ import java.io.File; import java.io.IOException; import java.util.Objects; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.apache.ratis.server.impl.RaftServerImpl.LOG; @@ -63,23 +62,23 @@ public class ServerState implements Closeable { * Latest term server has seen. initialized to 0 on first boot, increases * monotonically. */ - private long currentTerm; + private volatile long currentTerm; /** * The server ID of the leader for this term. Null means either there is * no leader for this term yet or this server does not know who it is yet. */ - private RaftPeerId leaderId; + private volatile RaftPeerId leaderId; /** * Candidate that this peer granted vote for in current term (or null if none) */ - private RaftPeerId votedFor; + private volatile RaftPeerId votedFor; /** * Latest installed snapshot for this server. This maybe different than StateMachine's latest * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately. * Further, this will not get updated when SM does snapshots itself. */ - private TermIndex latestInstalledSnapshot; + private volatile TermIndex latestInstalledSnapshot; ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop, RaftServerImpl server, StateMachine stateMachine) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9b84d79c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index a4ec715..bcfaf01 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -83,7 +83,7 @@ public class RaftServerTestUtil { } public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) { - return server.getLeaderState().getLogAppenders(); + return server.getRole().getLeaderState().map(LeaderState::getLogAppenders).orElse(null); } public static Logger getStateMachineUpdaterLog() {
