This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c4cfd5ece RATIS-1945. Enforce min wait time only for INCONSISTENCY 
replies. (#975)
c4cfd5ece is described below

commit c4cfd5eced5d81c17feab82ca76efdac541a0632
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Nov 30 20:06:56 2023 -0800

    RATIS-1945. Enforce min wait time only for INCONSISTENCY replies. (#975)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 130 +++++++++++++++------
 1 file changed, 95 insertions(+), 35 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b9764e987..7adbc7355 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -37,6 +37,7 @@ import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
@@ -52,7 +53,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -67,11 +67,78 @@ public class GrpcLogAppender extends LogAppenderBase {
     return diff == 0? 0: diff > 0? 1: -1;
   };
 
+  enum Event {
+    APPEND_ENTRIES_REPLY,
+    APPEND_ENTRIES_INCONSISTENCY_REPLY,
+    SNAPSHOT_REPLY,
+    COMPLETE,
+    TIMEOUT,
+    ERROR;
+
+    boolean updateFirstReplyReceived(boolean firstReplyReceived) {
+      switch (this) {
+        case APPEND_ENTRIES_REPLY:
+        case APPEND_ENTRIES_INCONSISTENCY_REPLY:
+        case SNAPSHOT_REPLY:
+        case COMPLETE:
+          return true;
+        case ERROR:
+          return false;
+        case TIMEOUT:
+          return firstReplyReceived;
+        default:
+          throw new IllegalStateException("Unexpected event: " + this);
+      }
+    }
+
+    boolean isError() {
+      switch (this) {
+        case APPEND_ENTRIES_INCONSISTENCY_REPLY:
+        case TIMEOUT:
+        case ERROR:
+          return true;
+        case APPEND_ENTRIES_REPLY:
+        case SNAPSHOT_REPLY:
+        case COMPLETE:
+          return false;
+        default:
+          throw new IllegalStateException("Unexpected event: " + this);
+      }
+    }
+  }
+
+  static class ReplyState {
+    private boolean firstReplyReceived = false;
+    private int errorCount = 0;
+
+    synchronized boolean isFirstReplyReceived() {
+      return firstReplyReceived;
+    }
+
+    synchronized int getErrorCount() {
+      return errorCount;
+    }
+
+    int process(AppendResult result) {
+      return process(result == AppendResult.INCONSISTENCY? 
Event.APPEND_ENTRIES_INCONSISTENCY_REPLY
+          : Event.APPEND_ENTRIES_REPLY);
+    }
+
+    synchronized int process(Event event) {
+      firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived);
+      if (event.isError()) {
+        errorCount++;
+      } else {
+        errorCount = 0;
+      }
+      return errorCount;
+    }
+  }
+
   private final AtomicLong callId = new AtomicLong();
 
   private final RequestMap pendingRequests = new RequestMap();
   private final int maxPendingRequestsNum;
-  private volatile boolean firstResponseReceived = false;
   private final boolean installSnapshotEnabled;
 
   private final TimeDuration requestTimeoutDuration;
@@ -87,7 +154,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final AutoCloseableReadWriteLock lock;
   private final StackTraceElement caller;
   private final RetryPolicy errorRetryWaitPolicy;
-  private final AtomicInteger errCount = new AtomicInteger(0);
+  private final ReplyState replyState = new ReplyState();
 
   public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, 
FollowerInfo f) {
     super(server, leaderState, f);
@@ -121,14 +188,14 @@ public class GrpcLogAppender extends LogAppenderBase {
     return getServerRpc().getProxies().getProxy(getFollowerId());
   }
 
-  private void resetClient(AppendEntriesRequest request, boolean onError) {
+  private void resetClient(AppendEntriesRequest request, Event event) {
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       getClient().resetConnectBackoff();
       if (appendLogRequestObserver != null) {
         appendLogRequestObserver.stop();
         appendLogRequestObserver = null;
       }
-      firstResponseReceived = false;
+      final int errorCount = replyState.process(event);
       // clear the pending requests queue and reset the next index of follower
       pendingRequests.clear();
       final FollowerInfo f = getFollower();
@@ -136,9 +203,9 @@ public class GrpcLogAppender extends LogAppenderBase {
           .map(AppendEntriesRequest::getPreviousLog)
           .map(TermIndex::getIndex)
           .orElseGet(f::getMatchIndex);
-      if (onError && request == null) {
-        LOG.warn("{}: Follower failed and request == null, " +
-            " keep nextIndex ({}) unchanged and retry.", this, 
f.getNextIndex());
+      if (event.isError() && request == null) {
+        LOG.warn("{}: Follower failed (request=null, errorCount={}); keep 
nextIndex ({}) unchanged and retry.",
+            this, errorCount, f.getNextIndex());
         return;
       }
       if (request != null && request.isHeartbeat()) {
@@ -215,7 +282,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   }
 
   private long errorWaitTimeMs() {
-    return errorRetryWaitPolicy.handleAttemptFailure(errCount::get)
+    return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount)
         .getSleepTime().toLong(TimeUnit.MILLISECONDS);
   }
 
@@ -235,15 +302,17 @@ public class GrpcLogAppender extends LogAppenderBase {
     return pendingRequests.logRequestsSize() > 0;
   }
 
-  /**
-   * @return true iff not received first response or queue is full.
-   */
+  /** @return true iff either (1) queue is full, or (2) queue is non-empty and 
not received first response. */
   private boolean haveTooManyPendingRequests() {
     final int size = pendingRequests.logRequestsSize();
     if (size == 0) {
       return false;
+    } else if (size >= maxPendingRequestsNum) {
+      return true;
+    } else {
+      // queue is non-empty and non-full
+      return !replyState.isFirstReplyReceived();
     }
-    return !firstResponseReceived || size >= maxPendingRequestsNum;
   }
 
   static class StreamObservers {
@@ -354,8 +423,9 @@ public class GrpcLogAppender extends LogAppenderBase {
   private void timeoutAppendRequest(long cid, boolean heartbeat) {
     final AppendEntriesRequest pending = pendingRequests.handleTimeout(cid, 
heartbeat);
     if (pending != null) {
-      errCount.incrementAndGet();
-      LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ? 
"HEARTBEAT" : "", pending);
+      final int errorCount = replyState.process(Event.TIMEOUT);
+      LOG.warn("{}: Timed out {}appendEntries, errorCount={}, request={}",
+          this, heartbeat ? "HEARTBEAT " : "", errorCount, pending);
       grpcServerMetrics.onRequestTimeout(getFollowerId().toString(), 
heartbeat);
       pending.stopRequestTimer();
     }
@@ -399,7 +469,7 @@ public class GrpcLogAppender extends LogAppenderBase {
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("{}: received {} reply {}, request={}",
-            this, firstResponseReceived? "a": "the first",
+            this, replyState.isFirstReplyReceived()? "a": "the first",
             ServerStringUtils.toAppendEntriesReplyString(reply), request);
       }
 
@@ -412,11 +482,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     }
 
     private void onNextImpl(AppendEntriesRequest request, 
AppendEntriesReplyProto reply) {
-      errCount.set(0);
-
-      if (!firstResponseReceived) {
-        firstResponseReceived = true;
-      }
+      final int errorCount = replyState.process(reply.getResult());
 
       switch (reply.getResult()) {
         case SUCCESS:
@@ -436,8 +502,8 @@ public class GrpcLogAppender extends LogAppenderBase {
           break;
         case INCONSISTENCY:
           grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
-          LOG.warn("{}: received {} reply with nextIndex {}, request={}",
-              this, reply.getResult(), reply.getNextIndex(), request);
+          LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, 
request={}",
+              this, reply.getResult(), reply.getNextIndex(), errorCount, 
request);
           final long requestFirstIndex = request != null? 
request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX;
           updateNextIndex(getNextIndexForInconsistency(requestFirstIndex, 
reply.getNextIndex()));
           break;
@@ -457,18 +523,16 @@ public class GrpcLogAppender extends LogAppenderBase {
         LOG.info("{} is already stopped", GrpcLogAppender.this);
         return;
       }
-      errCount.incrementAndGet();
       GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
       grpcServerMetrics.onRequestRetry(); // Update try counter
       AppendEntriesRequest request = 
pendingRequests.remove(GrpcUtil.getCallId(t), GrpcUtil.isHeartbeat(t));
-      resetClient(request, true);
+      resetClient(request, Event.ERROR);
     }
 
     @Override
     public void onCompleted() {
       LOG.info("{}: follower responses appendEntries COMPLETED", this);
-      errCount.set(0);
-      resetClient(null, false);
+      resetClient(null, Event.COMPLETE);
     }
 
     @Override
@@ -546,17 +610,14 @@ public class GrpcLogAppender extends LogAppenderBase {
     @Override
     public void onNext(InstallSnapshotReplyProto reply) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("{}: received {} reply {}", this, firstResponseReceived ? "a" 
: "the first",
+        LOG.info("{}: received {} reply {}", this, 
replyState.isFirstReplyReceived()? "a" : "the first",
             ServerStringUtils.toInstallSnapshotReplyString(reply));
       }
 
       // update the last rpc time
       getFollower().updateLastRpcResponseTime();
-      errCount.set(0);
+      replyState.process(Event.SNAPSHOT_REPLY);
 
-      if (!firstResponseReceived) {
-        firstResponseReceived = true;
-      }
       final long followerSnapshotIndex;
       switch (reply.getResult()) {
         case SUCCESS:
@@ -616,10 +677,9 @@ public class GrpcLogAppender extends LogAppenderBase {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
-      errCount.incrementAndGet();
       GrpcUtil.warn(LOG, () -> this + ": Failed InstallSnapshot", t);
       grpcServerMetrics.onRequestRetry(); // Update try counter
-      resetClient(null, true);
+      resetClient(null, Event.ERROR);
       close();
     }
 
@@ -628,7 +688,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       if (!isNotificationOnly || LOG.isDebugEnabled()) {
         LOG.info("{}: follower responded installSnapshot COMPLETED", this);
       }
-      errCount.set(0);
+      replyState.process(Event.COMPLETE);
       close();
     }
 

Reply via email to