This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d62117a  RATIS-1418. Leader step down for not sending out heartbeat 
promptly (#521)
d62117a is described below

commit d62117a2163d9c3027a6b5f7b18da36fb4e4a0ab
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Nov 5 18:33:04 2021 +0800

    RATIS-1418. Leader step down for not sending out heartbeat promptly (#521)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 24 ++++++++++++++--------
 .../apache/ratis/server/leader/FollowerInfo.java   |  5 ++++-
 .../apache/ratis/server/leader/LogAppender.java    | 22 +++++++++-----------
 .../apache/ratis/server/impl/FollowerInfoImpl.java | 15 ++++++++++++--
 .../ratis/server/leader/LogAppenderBase.java       | 10 ++++-----
 .../ratis/server/leader/LogAppenderDefault.java    |  6 +++---
 6 files changed, 51 insertions(+), 31 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1e8203d..70b23a7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -153,13 +153,13 @@ public class GrpcLogAppender extends LogAppenderBase {
     
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
   }
 
-  private long getWaitTimeMs() {
+  public long getWaitTimeMs() {
     if (haveTooManyPendingRequests()) {
-      return getHeartbeatRemainingTimeMs(); // Should wait for a short time
+      return getHeartbeatWaitTimeMs(); // Should wait for a short time
     } else if (shouldSendAppendEntries()) {
       return 0L;
     }
-    return Math.min(10L, getHeartbeatRemainingTimeMs());
+    return Math.min(10L, getHeartbeatWaitTimeMs());
   }
 
   private void mayWait() {
@@ -237,7 +237,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     scheduler.onTimeout(requestTimeoutDuration,
         () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
         LOG, () -> "Timeout check failed for append entry request: " + 
request);
-    getFollower().updateLastRpcSendTime();
+    getFollower().updateLastRpcSendTime(request.isHeartbeat());
   }
 
   private void timeoutAppendRequest(long cid, boolean heartbeat) {
@@ -510,7 +510,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       for (InstallSnapshotRequestProto request : 
newInstallSnapshotRequests(requestId, snapshot)) {
         if (isRunning()) {
           snapshotRequestObserver.onNext(request);
-          getFollower().updateLastRpcSendTime();
+          getFollower().updateLastRpcSendTime(false);
           responseHandler.addPending(request);
         } else {
           break;
@@ -560,7 +560,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     try {
       snapshotRequestObserver = getClient().installSnapshot(responseHandler);
       snapshotRequestObserver.onNext(request);
-      getFollower().updateLastRpcSendTime();
+      getFollower().updateLastRpcSendTime(false);
       responseHandler.addPending(request);
       snapshotRequestObserver.onCompleted();
     } catch (Exception e) {
@@ -596,10 +596,18 @@ public class GrpcLogAppender extends LogAppenderBase {
       // If the follower is bootstrapping and has not yet installed any 
snapshot from leader, then the follower should
       // be notified to install a snapshot. Every follower should try to 
install at least one snapshot during
       // bootstrapping, if available.
+      TermIndex lastEntry = getRaftLog().getLastEntryTermIndex();
+      if (lastEntry == null) {
+        // lastEntry may need to be derived from snapshot
+        SnapshotInfo snapshot = 
getServer().getStateMachine().getLatestSnapshot();
+        if (snapshot != null) {
+          lastEntry = snapshot.getTermIndex();
+        }
+      }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: Notify follower to install snapshot as it is 
bootstrapping.", this);
+        LOG.debug("{}: Notify follower to install snapshot as it is 
bootstrapping to TermIndex {}.", this, lastEntry);
       }
-      return getRaftLog().getLastEntryTermIndex();
+      return lastEntry;
     }
 
     final long followerNextIndex = follower.getNextIndex();
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index 2de24b0..b4ae845 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -82,8 +82,11 @@ public interface FollowerInfo {
   void updateLastRpcResponseTime();
 
   /** Update lastRpcSendTime to the current time. */
-  void updateLastRpcSendTime();
+  void updateLastRpcSendTime(boolean isHeartbeat);
 
   /** @return the latest of the lastRpcSendTime and the lastRpcResponseTime . 
*/
   Timestamp getLastRpcTime();
+
+  /** @return the latest heartbeat send time. */
+  Timestamp getLastHeartbeatSendTime();
 }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 5b45ca2..4ec13eb 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -151,24 +151,22 @@ public interface LogAppender {
 
   /** Should the leader send appendEntries RPC to the follower? */
   default boolean shouldSendAppendEntries() {
-    return hasAppendEntries() || shouldHeartbeat();
+    return hasAppendEntries() || getHeartbeatWaitTimeMs() <= 0;
   }
 
-  /** Does it has outstanding appendEntries? */
+  /** Does it have outstanding appendEntries? */
   default boolean hasAppendEntries() {
     return getFollower().getNextIndex() < getRaftLog().getNextIndex();
   }
 
-  /** The same as getHeartbeatRemainingTime() <= 0. */
-  default boolean shouldHeartbeat() {
-    return getHeartbeatRemainingTimeMs() <= 0;
-  }
-
-  /**
-   * @return the time in milliseconds that the leader should send a heartbeat.
-   */
-  default long getHeartbeatRemainingTimeMs() {
-    return getServer().properties().minRpcTimeoutMs()/2 - 
getFollower().getLastRpcTime().elapsedTimeMs();
+  /** @return the wait time in milliseconds to send the next heartbeat. */
+  default long getHeartbeatWaitTimeMs() {
+    final int min = getServer().properties().minRpcTimeoutMs();
+    // time remaining to send a heartbeat
+    final long heartbeatRemainingTimeMs = min/2 - 
getFollower().getLastRpcResponseTime().elapsedTimeMs();
+    // avoid sending heartbeat too frequently
+    final long noHeartbeatTimeMs = min/4 - 
getFollower().getLastHeartbeatSendTime().elapsedTimeMs();
+    return Math.max(heartbeatRemainingTimeMs, noHeartbeatTimeMs);
   }
 
   /** Handle the event that the follower has replied a term. */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 1499903..81dbe29 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -35,6 +35,7 @@ class FollowerInfoImpl implements FollowerInfo {
   private final RaftPeer peer;
   private final AtomicReference<Timestamp> lastRpcResponseTime;
   private final AtomicReference<Timestamp> lastRpcSendTime;
+  private final AtomicReference<Timestamp> lastHeartbeatSendTime;
   private final RaftLogIndex nextIndex;
   private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", 0L);
   private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", 
RaftLog.INVALID_LOG_INDEX);
@@ -50,6 +51,7 @@ class FollowerInfoImpl implements FollowerInfo {
     this.peer = peer;
     this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
     this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
+    this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
     this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
     this.attendVote = attendVote;
   }
@@ -159,12 +161,21 @@ class FollowerInfoImpl implements FollowerInfo {
   }
 
   @Override
-  public void updateLastRpcSendTime() {
-    lastRpcSendTime.set(Timestamp.currentTime());
+  public void updateLastRpcSendTime(boolean isHeartbeat) {
+    final Timestamp currentTime = Timestamp.currentTime();
+    lastRpcSendTime.set(currentTime);
+    if (isHeartbeat) {
+      lastHeartbeatSendTime.set(currentTime);
+    }
   }
 
   @Override
   public Timestamp getLastRpcTime() {
     return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
   }
+
+  @Override
+  public Timestamp getLastHeartbeatSendTime() {
+    return lastHeartbeatSendTime.get();
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index b706e87..28ce1eb 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -127,8 +127,8 @@ public abstract class LogAppenderBase implements 
LogAppender {
   public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat) throws RaftLogIOException {
     final TermIndex previous = getPrevious(follower.getNextIndex());
     final long snapshotIndex = follower.getSnapshotIndex();
-    final long heartbeatRemainingMs = getHeartbeatRemainingTimeMs();
-    if (heartbeatRemainingMs <= 0L || heartbeat) {
+    final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
+    if (heartbeatWaitTimeMs <= 0L || heartbeat) {
       // heartbeat
       return leaderState.newAppendEntriesRequestProto(follower, 
Collections.emptyList(), previous, callId);
     }
@@ -137,8 +137,8 @@ public abstract class LogAppenderBase implements 
LogAppender {
 
     final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
-    final long halfMs = heartbeatRemainingMs/2;
-    for (long next = followerNext; leaderNext > next && 
getHeartbeatRemainingTimeMs() - halfMs > 0; ) {
+    final long halfMs = heartbeatWaitTimeMs/2;
+    for (long next = followerNext; leaderNext > next && 
getHeartbeatWaitTimeMs() - halfMs > 0; ) {
       if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
         break;
       }
@@ -147,7 +147,7 @@ public abstract class LogAppenderBase implements 
LogAppender {
       return null;
     }
 
-    final List<LogEntryProto> protos = 
buffer.pollList(getHeartbeatRemainingTimeMs(), EntryWithData::getEntry,
+    final List<LogEntryProto> protos = 
buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
         (entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
             follower.getName(), entry, time, exception));
     buffer.clear();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index ad05c51..06937f8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -61,7 +61,7 @@ class LogAppenderDefault extends LogAppenderBase {
           return null;
         }
 
-        getFollower().updateLastRpcSendTime();
+        getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
         final AppendEntriesReplyProto r = 
getServerRpc().appendEntries(request);
         getFollower().updateLastRpcResponseTime();
 
@@ -88,7 +88,7 @@ class LogAppenderDefault extends LogAppenderBase {
     InstallSnapshotReplyProto reply = null;
     try {
       for (InstallSnapshotRequestProto request : 
newInstallSnapshotRequests(requestId, snapshot)) {
-        getFollower().updateLastRpcSendTime();
+        getFollower().updateLastRpcSendTime(false);
         reply = getServerRpc().installSnapshot(request);
         getFollower().updateLastRpcResponseTime();
 
@@ -145,7 +145,7 @@ class LogAppenderDefault extends LogAppenderBase {
         }
       }
       if (isRunning() && !hasAppendEntries()) {
-        final long waitTime = getHeartbeatRemainingTimeMs();
+        final long waitTime = getHeartbeatWaitTimeMs();
         if (waitTime > 0) {
           synchronized (this) {
             wait(waitTime);

Reply via email to