This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch RATIS-1209 in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
commit 2b811c1bffd830049eb86d050e2b95e2d3d719ea Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Sun Dec 6 13:41:48 2020 +0800 rename methods --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 34 ++++++++----- .../apache/ratis/server/impl/LeaderStateImpl.java | 24 ++++----- .../org/apache/ratis/server/impl/LogAppender.java | 57 ++++++++++------------ .../ratis/server/impl/LogAppenderDaemon.java | 2 +- 4 files changed, 59 insertions(+), 58 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 85d3572..dc8e40f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -114,10 +114,18 @@ public class GrpcLogAppender extends LogAppender { getFollower().decreaseNextIndex(nextIndex); } + private boolean haveLogEntriesToSendOut() { + return shouldAppendEntries(getFollower().getNextIndex()); + } + + private boolean isFollowerCommitBehindLastCommitIndex() { + return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex(); + } + @Override - protected void runAppenderImpl() throws IOException { + protected void run() throws IOException { boolean installSnapshotRequired; - for(; isAppenderRunning(); mayWait()) { + for(; isRunning(); mayWait()) { installSnapshotRequired = false; //HB period is expired OR we have messages OR follower is behind with commit index @@ -174,9 +182,9 @@ public class GrpcLogAppender extends LogAppender { } @Override - public void stopAppender() { + public void stop() { grpcServerMetrics.unregister(); - super.stopAppender(); + super.stop(); } @Override @@ -203,7 +211,7 @@ public class GrpcLogAppender extends LogAppender { // prepare and enqueue the append request. note changes on follower's // nextIndex and ops on pendingRequests should always be associated // together and protected by the lock - pending = createRequest(callId++, excludeLogEntries); + pending = newAppendEntriesRequest(callId++, excludeLogEntries); if (pending == null) { return; } @@ -216,7 +224,7 @@ public class GrpcLogAppender extends LogAppender { s = appendLogRequestObserver; } - if (isAppenderRunning()) { + if (isRunning()) { sendRequest(request, pending, s); } } @@ -316,7 +324,7 @@ public class GrpcLogAppender extends LogAppender { default: throw new IllegalStateException("Unexpected reply result: " + reply.getResult()); } - notifyAppend(); + notifyLogAppender(); } /** @@ -324,7 +332,7 @@ public class GrpcLogAppender extends LogAppender { */ @Override public void onError(Throwable t) { - if (!isAppenderRunning()) { + if (!isRunning()) { LOG.info("{} is stopped", GrpcLogAppender.this); return; } @@ -382,7 +390,7 @@ public class GrpcLogAppender extends LogAppender { void close() { done.set(true); - GrpcLogAppender.this.notifyAppend(); + notifyLogAppender(); } synchronized boolean hasAllResponse() { @@ -439,7 +447,7 @@ public class GrpcLogAppender extends LogAppender { @Override public void onError(Throwable t) { - if (!isAppenderRunning()) { + if (!isRunning()) { LOG.info("{} is stopped", GrpcLogAppender.this); return; } @@ -477,7 +485,7 @@ public class GrpcLogAppender extends LogAppender { try { snapshotRequestObserver = getClient().installSnapshot(responseHandler); for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) { - if (isAppenderRunning()) { + if (isRunning()) { snapshotRequestObserver.onNext(request); getFollower().updateLastRpcSendTime(); responseHandler.addPending(request); @@ -496,7 +504,7 @@ public class GrpcLogAppender extends LogAppender { } synchronized (this) { - while (isAppenderRunning() && !responseHandler.isDone()) { + while (isRunning() && !responseHandler.isDone()) { try { wait(); } catch (InterruptedException ignored) { @@ -541,7 +549,7 @@ public class GrpcLogAppender extends LogAppender { } synchronized (this) { - if (isAppenderRunning() && !responseHandler.isDone()) { + if (isRunning() && !responseHandler.isDone()) { try { wait(); } catch (InterruptedException ignored) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 3d52e44..ad6d7c37 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -284,7 +284,7 @@ class LeaderStateImpl implements LeaderState { server.getId().toString(), null); raftLog.append(placeHolder); processor.start(); - senders.forEach(LogAppender::startAppender); + senders.forEach(LogAppender::start); return placeHolder; } @@ -295,7 +295,7 @@ class LeaderStateImpl implements LeaderState { void stop() { this.running = false; // do not interrupt event processor since it may be in the middle of logSync - senders.forEach(LogAppender::stopAppender); + senders.forEach(LogAppender::stop); final NotLeaderException nle = server.generateNotLeaderException(); final Collection<CommitInfoProto> commitInfos = server.getCommitInfos(); try { @@ -313,7 +313,7 @@ class LeaderStateImpl implements LeaderState { } void notifySenders() { - senders.forEach(LogAppender::notifyAppend); + senders.forEach(LogAppender::notifyLogAppender); } boolean inStagingState() { @@ -466,7 +466,7 @@ class LeaderStateImpl implements LeaderState { * Update sender list for setConfiguration request */ void addAndStartSenders(Collection<RaftPeer> newPeers) { - addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::startAppender); + addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start); } Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) { @@ -486,7 +486,7 @@ class LeaderStateImpl implements LeaderState { void stopAndRemoveSenders(Predicate<LogAppender> predicate) { final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList()); - toStop.forEach(LogAppender::stopAppender); + toStop.forEach(LogAppender::stop); senders.removeAll(toStop); } @@ -494,7 +494,7 @@ class LeaderStateImpl implements LeaderState { public void restart(LogAppender sender) { final FollowerInfo follower = sender.getFollower(); LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName()); - sender.stopAppender(); + sender.stop(); senders.removeAll(Collections.singleton(sender)); addAndStartSenders(Collections.singleton(follower.getPeer())); } @@ -504,7 +504,7 @@ class LeaderStateImpl implements LeaderState { */ private void updateSenders(RaftConfiguration conf) { Preconditions.assertTrue(conf.isStable() && !inStagingState()); - stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId())); + stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId())); } void submitStepDownEvent(StepDownReason reason) { @@ -845,14 +845,14 @@ class LeaderStateImpl implements LeaderState { private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) { List<List<RaftPeerId>> lists = new ArrayList<>(2); List<RaftPeerId> listForNew = senders.stream() - .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId())) - .map(sender -> sender.getFollower().getPeer().getId()) + .map(LogAppender::getFollowerId) + .filter(conf::containsInConf) .collect(Collectors.toList()); lists.add(listForNew); if (conf.isTransitional()) { List<RaftPeerId> listForOld = senders.stream() - .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId())) - .map(sender -> sender.getFollower().getPeer().getId()) + .map(LogAppender::getFollowerId) + .filter(conf::containsInOldConf) .collect(Collectors.toList()); lists.add(listForOld); } @@ -923,7 +923,7 @@ class LeaderStateImpl implements LeaderState { .filter(sender -> sender.getFollower() .getLastRpcResponseTime() .elapsedTimeMs() <= server.getMaxTimeoutMs()) - .map(sender -> sender.getFollower().getPeer().getId()) + .map(LogAppender::getFollowerId) .collect(Collectors.toList()); final RaftConfiguration conf = server.getRaftConf(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index a922e00..ada5424 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -50,7 +50,6 @@ public class LogAppender { private final String name; private final RaftServer.Division server; private final LeaderState leaderState; - private final RaftLog raftLog; private final FollowerInfo follower; private final DataQueue<EntryWithData> buffer; @@ -64,7 +63,6 @@ public class LogAppender { this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass()); this.server = server; this.leaderState = leaderState; - this.raftLog = server.getRaftLog(); final RaftProperties properties = server.getRaftServer().getProperties(); this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt(); @@ -85,7 +83,7 @@ public class LogAppender { } public RaftLog getRaftLog() { - return raftLog; + return getServer().getRaftLog(); } @Override @@ -93,15 +91,15 @@ public class LogAppender { return name; } - void startAppender() { + public void start() { daemon.tryToStart(); } - public boolean isAppenderRunning() { + public boolean isRunning() { return daemon.isWorking(); } - public void stopAppender() { + public void stop() { daemon.tryToClose(); } @@ -123,7 +121,7 @@ public class LogAppender { } final long previousIndex = nextIndex - 1; - final TermIndex previous = raftLog.getTermIndex(previousIndex); + final TermIndex previous = getRaftLog().getTermIndex(previousIndex); if (previous != null) { return previous; } @@ -139,7 +137,7 @@ public class LogAppender { return null; } - protected AppendEntriesRequestProto createRequest(long callId, + protected AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException { final TermIndex previous = getPrevious(follower.getNextIndex()); final long snapshotIndex = follower.getSnapshotIndex(); @@ -151,11 +149,11 @@ public class LogAppender { Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements."); - final long leaderNext = raftLog.getNextIndex(); + final long leaderNext = getRaftLog().getNextIndex(); final long followerNext = follower.getNextIndex(); final long halfMs = heartbeatRemainingMs/2; for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) { - if (!buffer.offer(raftLog.getEntryWithData(next++))) { + if (!buffer.offer(getRaftLog().getEntryWithData(next++))) { break; } } @@ -195,16 +193,16 @@ public class LogAppender { throws InterruptedException, InterruptedIOException, RaftLogIOException { int retry = 0; AppendEntriesRequestProto request = null; - while (isAppenderRunning()) { // keep retrying for IOException + while (isRunning()) { // keep retrying for IOException try { if (request == null || request.getEntriesCount() == 0) { - request = createRequest(DEFAULT_CALLID, false); + request = newAppendEntriesRequest(DEFAULT_CALLID, false); } if (request == null) { LOG.trace("{} no entries to send now, wait ...", this); return null; - } else if (!isAppenderRunning()) { + } else if (!isRunning()) { LOG.info("{} is stopped. Skip appendEntries.", this); return null; } @@ -224,7 +222,7 @@ public class LogAppender { } handleException(ioe); } - if (isAppenderRunning()) { + if (isRunning()) { server.properties().rpcSleepTime().sleep(); } } @@ -275,11 +273,11 @@ public class LogAppender { } protected SnapshotInfo shouldInstallSnapshot() { - final long logStartIndex = raftLog.getStartIndex(); + final long logStartIndex = getRaftLog().getStartIndex(); // we should install snapshot if the follower needs to catch up and: // 1. there is no local log entry but there is snapshot // 2. or the follower's next index is smaller than the log start index - if (follower.getNextIndex() < raftLog.getNextIndex()) { + if (follower.getNextIndex() < getRaftLog().getNextIndex()) { final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot(); if (follower.getNextIndex() < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) { @@ -290,13 +288,13 @@ public class LogAppender { } /** Check and send appendEntries RPC */ - protected void runAppenderImpl() throws InterruptedException, IOException { - while (isAppenderRunning()) { + protected void run() throws InterruptedException, IOException { + while (isRunning()) { if (shouldSendRequest()) { SnapshotInfo snapshot = shouldInstallSnapshot(); if (snapshot != null) { LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower", - this, follower.getNextIndex(), raftLog.getStartIndex(), snapshot); + this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot); final InstallSnapshotReplyProto r = installSnapshot(snapshot); if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) { @@ -309,7 +307,7 @@ public class LogAppender { } } } - if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) { + if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) { final long waitTime = getHeartbeatRemainingTime(); if (waitTime > 0) { synchronized (this) { @@ -359,8 +357,10 @@ public class LogAppender { getServerRpc().handleException(getFollowerId(), e, false); } - public synchronized void notifyAppend() { - this.notify(); + public void notifyLogAppender() { + synchronized (this) { + this.notify(); + } } /** Should the leader send appendEntries RPC to this follower? */ @@ -368,16 +368,9 @@ public class LogAppender { return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout(); } - protected boolean haveLogEntriesToSendOut() { - return shouldAppendEntries(follower.getNextIndex()); - } - - protected boolean isFollowerCommitBehindLastCommitIndex() { - return raftLog.getLastCommittedIndex() > follower.getCommitIndex(); - } - private boolean shouldAppendEntries(long followerIndex) { - return followerIndex < raftLog.getNextIndex(); + public boolean shouldAppendEntries(long followerIndex) { + return followerIndex < getRaftLog().getNextIndex(); } protected boolean heartbeatTimeout() { @@ -393,7 +386,7 @@ public class LogAppender { protected boolean checkResponseTerm(long responseTerm) { synchronized (server) { - return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm); + return isRunning() && leaderState.onFollowerTerm(follower, responseTerm); } } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java index 4d0a662..4c375a8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java @@ -74,7 +74,7 @@ class LogAppenderDaemon { private void run() { try { if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) { - logAppender.runAppenderImpl(); + logAppender.run(); } lifeCycle.compareAndTransition(RUNNING, CLOSING); } catch (InterruptedException e) {
