This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d62117a RATIS-1418. Leader step down for not sending out heartbeat
promptly (#521)
d62117a is described below
commit d62117a2163d9c3027a6b5f7b18da36fb4e4a0ab
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Nov 5 18:33:04 2021 +0800
RATIS-1418. Leader step down for not sending out heartbeat promptly (#521)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 24 ++++++++++++++--------
.../apache/ratis/server/leader/FollowerInfo.java | 5 ++++-
.../apache/ratis/server/leader/LogAppender.java | 22 +++++++++-----------
.../apache/ratis/server/impl/FollowerInfoImpl.java | 15 ++++++++++++--
.../ratis/server/leader/LogAppenderBase.java | 10 ++++-----
.../ratis/server/leader/LogAppenderDefault.java | 6 +++---
6 files changed, 51 insertions(+), 31 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1e8203d..70b23a7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -153,13 +153,13 @@ public class GrpcLogAppender extends LogAppenderBase {
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
}
- private long getWaitTimeMs() {
+ public long getWaitTimeMs() {
if (haveTooManyPendingRequests()) {
- return getHeartbeatRemainingTimeMs(); // Should wait for a short time
+ return getHeartbeatWaitTimeMs(); // Should wait for a short time
} else if (shouldSendAppendEntries()) {
return 0L;
}
- return Math.min(10L, getHeartbeatRemainingTimeMs());
+ return Math.min(10L, getHeartbeatWaitTimeMs());
}
private void mayWait() {
@@ -237,7 +237,7 @@ public class GrpcLogAppender extends LogAppenderBase {
scheduler.onTimeout(requestTimeoutDuration,
() -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
LOG, () -> "Timeout check failed for append entry request: " +
request);
- getFollower().updateLastRpcSendTime();
+ getFollower().updateLastRpcSendTime(request.isHeartbeat());
}
private void timeoutAppendRequest(long cid, boolean heartbeat) {
@@ -510,7 +510,7 @@ public class GrpcLogAppender extends LogAppenderBase {
for (InstallSnapshotRequestProto request :
newInstallSnapshotRequests(requestId, snapshot)) {
if (isRunning()) {
snapshotRequestObserver.onNext(request);
- getFollower().updateLastRpcSendTime();
+ getFollower().updateLastRpcSendTime(false);
responseHandler.addPending(request);
} else {
break;
@@ -560,7 +560,7 @@ public class GrpcLogAppender extends LogAppenderBase {
try {
snapshotRequestObserver = getClient().installSnapshot(responseHandler);
snapshotRequestObserver.onNext(request);
- getFollower().updateLastRpcSendTime();
+ getFollower().updateLastRpcSendTime(false);
responseHandler.addPending(request);
snapshotRequestObserver.onCompleted();
} catch (Exception e) {
@@ -596,10 +596,18 @@ public class GrpcLogAppender extends LogAppenderBase {
// If the follower is bootstrapping and has not yet installed any
snapshot from leader, then the follower should
// be notified to install a snapshot. Every follower should try to
install at least one snapshot during
// bootstrapping, if available.
+ TermIndex lastEntry = getRaftLog().getLastEntryTermIndex();
+ if (lastEntry == null) {
+ // lastEntry may need to be derived from snapshot
+ SnapshotInfo snapshot =
getServer().getStateMachine().getLatestSnapshot();
+ if (snapshot != null) {
+ lastEntry = snapshot.getTermIndex();
+ }
+ }
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Notify follower to install snapshot as it is
bootstrapping.", this);
+ LOG.debug("{}: Notify follower to install snapshot as it is
bootstrapping to TermIndex {}.", this, lastEntry);
}
- return getRaftLog().getLastEntryTermIndex();
+ return lastEntry;
}
final long followerNextIndex = follower.getNextIndex();
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index 2de24b0..b4ae845 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -82,8 +82,11 @@ public interface FollowerInfo {
void updateLastRpcResponseTime();
/** Update lastRpcSendTime to the current time. */
- void updateLastRpcSendTime();
+ void updateLastRpcSendTime(boolean isHeartbeat);
/** @return the latest of the lastRpcSendTime and the lastRpcResponseTime .
*/
Timestamp getLastRpcTime();
+
+ /** @return the latest heartbeat send time. */
+ Timestamp getLastHeartbeatSendTime();
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 5b45ca2..4ec13eb 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -151,24 +151,22 @@ public interface LogAppender {
/** Should the leader send appendEntries RPC to the follower? */
default boolean shouldSendAppendEntries() {
- return hasAppendEntries() || shouldHeartbeat();
+ return hasAppendEntries() || getHeartbeatWaitTimeMs() <= 0;
}
- /** Does it has outstanding appendEntries? */
+ /** Does it have outstanding appendEntries? */
default boolean hasAppendEntries() {
return getFollower().getNextIndex() < getRaftLog().getNextIndex();
}
- /** The same as getHeartbeatRemainingTime() <= 0. */
- default boolean shouldHeartbeat() {
- return getHeartbeatRemainingTimeMs() <= 0;
- }
-
- /**
- * @return the time in milliseconds that the leader should send a heartbeat.
- */
- default long getHeartbeatRemainingTimeMs() {
- return getServer().properties().minRpcTimeoutMs()/2 -
getFollower().getLastRpcTime().elapsedTimeMs();
+ /** @return the wait time in milliseconds to send the next heartbeat. */
+ default long getHeartbeatWaitTimeMs() {
+ final int min = getServer().properties().minRpcTimeoutMs();
+ // time remaining to send a heartbeat
+ final long heartbeatRemainingTimeMs = min/2 -
getFollower().getLastRpcResponseTime().elapsedTimeMs();
+ // avoid sending heartbeat too frequently
+ final long noHeartbeatTimeMs = min/4 -
getFollower().getLastHeartbeatSendTime().elapsedTimeMs();
+ return Math.max(heartbeatRemainingTimeMs, noHeartbeatTimeMs);
}
/** Handle the event that the follower has replied a term. */
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 1499903..81dbe29 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -35,6 +35,7 @@ class FollowerInfoImpl implements FollowerInfo {
private final RaftPeer peer;
private final AtomicReference<Timestamp> lastRpcResponseTime;
private final AtomicReference<Timestamp> lastRpcSendTime;
+ private final AtomicReference<Timestamp> lastHeartbeatSendTime;
private final RaftLogIndex nextIndex;
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex",
RaftLog.INVALID_LOG_INDEX);
@@ -50,6 +51,7 @@ class FollowerInfoImpl implements FollowerInfo {
this.peer = peer;
this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
+ this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
this.attendVote = attendVote;
}
@@ -159,12 +161,21 @@ class FollowerInfoImpl implements FollowerInfo {
}
@Override
- public void updateLastRpcSendTime() {
- lastRpcSendTime.set(Timestamp.currentTime());
+ public void updateLastRpcSendTime(boolean isHeartbeat) {
+ final Timestamp currentTime = Timestamp.currentTime();
+ lastRpcSendTime.set(currentTime);
+ if (isHeartbeat) {
+ lastHeartbeatSendTime.set(currentTime);
+ }
}
@Override
public Timestamp getLastRpcTime() {
return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
}
+
+ @Override
+ public Timestamp getLastHeartbeatSendTime() {
+ return lastHeartbeatSendTime.get();
+ }
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index b706e87..28ce1eb 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -127,8 +127,8 @@ public abstract class LogAppenderBase implements
LogAppender {
public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat) throws RaftLogIOException {
final TermIndex previous = getPrevious(follower.getNextIndex());
final long snapshotIndex = follower.getSnapshotIndex();
- final long heartbeatRemainingMs = getHeartbeatRemainingTimeMs();
- if (heartbeatRemainingMs <= 0L || heartbeat) {
+ final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
+ if (heartbeatWaitTimeMs <= 0L || heartbeat) {
// heartbeat
return leaderState.newAppendEntriesRequestProto(follower,
Collections.emptyList(), previous, callId);
}
@@ -137,8 +137,8 @@ public abstract class LogAppenderBase implements
LogAppender {
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
- final long halfMs = heartbeatRemainingMs/2;
- for (long next = followerNext; leaderNext > next &&
getHeartbeatRemainingTimeMs() - halfMs > 0; ) {
+ final long halfMs = heartbeatWaitTimeMs/2;
+ for (long next = followerNext; leaderNext > next &&
getHeartbeatWaitTimeMs() - halfMs > 0; ) {
if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
break;
}
@@ -147,7 +147,7 @@ public abstract class LogAppenderBase implements
LogAppender {
return null;
}
- final List<LogEntryProto> protos =
buffer.pollList(getHeartbeatRemainingTimeMs(), EntryWithData::getEntry,
+ final List<LogEntryProto> protos =
buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
(entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
follower.getName(), entry, time, exception));
buffer.clear();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index ad05c51..06937f8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -61,7 +61,7 @@ class LogAppenderDefault extends LogAppenderBase {
return null;
}
- getFollower().updateLastRpcSendTime();
+ getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
final AppendEntriesReplyProto r =
getServerRpc().appendEntries(request);
getFollower().updateLastRpcResponseTime();
@@ -88,7 +88,7 @@ class LogAppenderDefault extends LogAppenderBase {
InstallSnapshotReplyProto reply = null;
try {
for (InstallSnapshotRequestProto request :
newInstallSnapshotRequests(requestId, snapshot)) {
- getFollower().updateLastRpcSendTime();
+ getFollower().updateLastRpcSendTime(false);
reply = getServerRpc().installSnapshot(request);
getFollower().updateLastRpcResponseTime();
@@ -145,7 +145,7 @@ class LogAppenderDefault extends LogAppenderBase {
}
}
if (isRunning() && !hasAppendEntries()) {
- final long waitTime = getHeartbeatRemainingTimeMs();
+ final long waitTime = getHeartbeatWaitTimeMs();
if (waitTime > 0) {
synchronized (this) {
wait(waitTime);