This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-3.1.1_review
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 099d23f258153917d2ba63f0d72f38504eb5fd10
Author: Flyangz <[email protected]>
AuthorDate: Thu Aug 22 00:10:04 2024 +0800

    RATIS-2137. Fix LogAppenderDefault in handling of INCONSISTENCY. (#1136)
---
 .../ratis/server/leader/LogAppenderDefault.java    | 23 ++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 21ef70d4d..1b21bb7e4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -23,6 +23,7 @@ import 
org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -33,6 +34,7 @@ import java.io.InterruptedIOException;
 import java.util.Comparator;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * The default implementation of {@link LogAppender}
@@ -54,7 +56,7 @@ class LogAppenderDefault extends LogAppenderBase {
   }
 
   /** Send an appendEntries RPC; retry indefinitely. */
-  private AppendEntriesReplyProto sendAppendEntriesWithRetries()
+  private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong 
requestFirstIndex)
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
 
@@ -76,12 +78,15 @@ class LogAppenderDefault extends LogAppenderBase {
         resetHeartbeatTrigger();
         final Timestamp sendTime = Timestamp.currentTime();
         getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
-        final AppendEntriesReplyProto r = 
getServerRpc().appendEntries(request);
+        final AppendEntriesRequestProto proto = request;
+        final AppendEntriesReplyProto reply = 
getServerRpc().appendEntries(proto);
+        final long first = proto.getEntriesCount() > 0 ? 
proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
+        requestFirstIndex.set(first);
         getFollower().updateLastRpcResponseTime();
         getFollower().updateLastRespondedAppendEntriesSendTime(sendTime);
 
-        getLeaderState().onFollowerCommitIndex(getFollower(), 
r.getFollowerCommit());
-        return r;
+        getLeaderState().onFollowerCommitIndex(getFollower(), 
reply.getFollowerCommit());
+        return reply;
       } catch (InterruptedIOException | RaftLogIOException e) {
         throw e;
       } catch (IOException ioe) {
@@ -153,9 +158,10 @@ class LogAppenderDefault extends LogAppenderBase {
           }
           // otherwise if r is null, retry the snapshot installation
         } else {
-          final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
+          final AtomicLong requestFirstIndex = new 
AtomicLong(RaftLog.INVALID_LOG_INDEX);
+          final AppendEntriesReplyProto r = 
sendAppendEntriesWithRetries(requestFirstIndex);
           if (r != null) {
-            handleReply(r);
+            handleReply(r, requestFirstIndex.get());
           }
         }
       }
@@ -166,7 +172,8 @@ class LogAppenderDefault extends LogAppenderBase {
     }
   }
 
-  private void handleReply(AppendEntriesReplyProto reply) throws 
IllegalArgumentException {
+  private void handleReply(AppendEntriesReplyProto reply, long 
requestFirstIndex)
+      throws IllegalArgumentException {
     if (reply != null) {
       switch (reply.getResult()) {
         case SUCCESS:
@@ -189,7 +196,7 @@ class LogAppenderDefault extends LogAppenderBase {
           onFollowerTerm(reply.getTerm());
           break;
         case INCONSISTENCY:
-          getFollower().decreaseNextIndex(reply.getNextIndex());
+          
getFollower().setNextIndex(getNextIndexForInconsistency(requestFirstIndex, 
reply.getNextIndex()));
           break;
         case UNRECOGNIZED:
           LOG.warn("{}: received {}", this, reply.getResult());

Reply via email to