This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 287a4ac7c RATIS-2137. Fix LogAppenderDefault in handling of 
INCONSISTENCY. (#1136)
287a4ac7c is described below

commit 287a4ac7c467c5a5132c0e85bc83923e27a0f064
Author: Flyangz <[email protected]>
AuthorDate: Thu Aug 22 00:10:04 2024 +0800

    RATIS-2137. Fix LogAppenderDefault in handling of INCONSISTENCY. (#1136)
---
 .../ratis/server/leader/LogAppenderDefault.java     | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 432a41992..f75a80f82 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -23,6 +23,7 @@ import 
org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -34,6 +35,7 @@ import java.io.InterruptedIOException;
 import java.util.Comparator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The default implementation of {@link LogAppender}
@@ -55,7 +57,7 @@ class LogAppenderDefault extends LogAppenderBase {
   }
 
   /** Send an appendEntries RPC; retry indefinitely. */
-  private AppendEntriesReplyProto sendAppendEntriesWithRetries()
+  private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong 
requestFirstIndex)
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
 
@@ -78,9 +80,12 @@ class LogAppenderDefault extends LogAppenderBase {
           return null;
         }
 
-        AppendEntriesReplyProto r = sendAppendEntries(request.get());
+        final AppendEntriesRequestProto proto = request.get();
+        final AppendEntriesReplyProto reply = sendAppendEntries(proto);
+        final long first = proto.getEntriesCount() > 0 ? 
proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
+        requestFirstIndex.set(first);
         request.release();
-        return r;
+        return reply;
       } catch (InterruptedIOException | RaftLogIOException e) {
         throw e;
       } catch (IOException ioe) {
@@ -164,9 +169,10 @@ class LogAppenderDefault extends LogAppenderBase {
           }
           // otherwise if r is null, retry the snapshot installation
         } else {
-          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
+          final AtomicLong requestFirstIndex = new 
AtomicLong(RaftLog.INVALID_LOG_INDEX);
+          final AppendEntriesReplyProto r = 
sendAppendEntriesWithRetries(requestFirstIndex);
           if (r != null) {
-            handleReply(r);
+            handleReply(r, requestFirstIndex.get());
           }
         }
       }
@@ -177,7 +183,8 @@ class LogAppenderDefault extends LogAppenderBase {
     }
   }
 
-  private void handleReply(AppendEntriesReplyProto reply) throws 
IllegalArgumentException {
+  private void handleReply(AppendEntriesReplyProto reply, long 
requestFirstIndex)
+      throws IllegalArgumentException {
     if (reply != null) {
       switch (reply.getResult()) {
         case SUCCESS:
@@ -200,7 +207,7 @@ class LogAppenderDefault extends LogAppenderBase {
           onFollowerTerm(reply.getTerm());
           break;
         case INCONSISTENCY:
-          getFollower().decreaseNextIndex(reply.getNextIndex());
+          
getFollower().setNextIndex(getNextIndexForInconsistency(requestFirstIndex, 
reply.getNextIndex()));
           break;
         case UNRECOGNIZED:
           LOG.warn("{}: received {}", this, reply.getResult());

Reply via email to