This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b7ffa1ba1 RATIS-1909. Fix Decreasing Next Index When GrpcLogAppender 
Reset Client. (#939)
b7ffa1ba1 is described below

commit b7ffa1ba1e3e7cecd9ea687f72425c2ffd5b1c34
Author: Brokenice0415 <[email protected]>
AuthorDate: Thu Oct 19 00:49:59 2023 +0800

    RATIS-1909. Fix Decreasing Next Index When GrpcLogAppender Reset Client. 
(#939)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java    |  2 +-
 .../org/apache/ratis/server/leader/FollowerInfo.java |  5 +++++
 .../apache/ratis/server/impl/FollowerInfoImpl.java   |  7 +++++++
 .../apache/ratis/server/leader/LogAppenderBase.java  | 20 ++++++++++++++++++++
 4 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 5c07d7f6d..5d83259ca 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -143,7 +143,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       if (request != null && request.isHeartbeat()) {
         return;
       }
-      getFollower().decreaseNextIndex(nextIndex);
+      getFollower().computeNextIndex(getNextIndexForError(nextIndex));
     } catch (IOException ie) {
       LOG.warn(this + ": Failed to getClient for " + getFollowerId(), ie);
     }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index 9d5c891d9..e56374512 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -23,6 +23,8 @@ import org.apache.ratis.util.Timestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.function.LongUnaryOperator;
+
 /**
  * Information of a follower, provided the local server is the Leader
  */
@@ -84,6 +86,9 @@ public interface FollowerInfo {
   /** Update the nextIndex for this follower. */
   void updateNextIndex(long newNextIndex);
 
+  /** Set the nextIndex for this follower. */
+  void computeNextIndex(LongUnaryOperator op);
+
   /** @return the lastRpcResponseTime . */
   Timestamp getLastRpcResponseTime();
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 9394398ac..7c34c1cb8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.util.Timestamp;
 
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
 
 class FollowerInfoImpl implements FollowerInfo {
   private final String name;
@@ -135,6 +136,12 @@ class FollowerInfoImpl implements FollowerInfo {
         message -> debug("updateNextIndex", message));
   }
 
+  @Override
+  public void computeNextIndex(LongUnaryOperator op) {
+    nextIndex.updateUnconditionally(op,
+        message -> info("computeNextIndex", message));
+  }
+
   @Override
   public void setSnapshotIndex(long newSnapshotIndex) {
     snapshotIndex.setUnconditionally(newSnapshotIndex, this::info);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index c685f2f32..b218261da 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongUnaryOperator;
 
 /**
  * An abstract implementation of {@link LogAppender}.
@@ -197,6 +198,25 @@ public abstract class LogAppenderBase implements 
LogAppender {
     return next;
   }
 
+  protected LongUnaryOperator getNextIndexForError(long newNextIndex) {
+    return oldNextIndex -> {
+      final long m = getFollower().getMatchIndex() + 1;
+      final long n = oldNextIndex <= 0L ? oldNextIndex : Math.min(oldNextIndex 
- 1, newNextIndex);
+      if (m > n) {
+        if (m > newNextIndex) {
+          LOG.info("Set nextIndex to matchIndex + 1 (= " + m + ")");
+        }
+        return m;
+      } else if (oldNextIndex <= 0L) {
+        return oldNextIndex; // no change.
+      } else {
+        LOG.info("Decrease nextIndex to " + n);
+        return n;
+      }
+    };
+  }
+
+
   @Override
   public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat)
       throws RaftLogIOException {

Reply via email to