This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-3.1.1_review in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 099d23f258153917d2ba63f0d72f38504eb5fd10 Author: Flyangz <[email protected]> AuthorDate: Thu Aug 22 00:10:04 2024 +0800 RATIS-2137. Fix LogAppenderDefault in handling of INCONSISTENCY. (#1136) --- .../ratis/server/leader/LogAppenderDefault.java | 23 ++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index 21ef70d4d..1b21bb7e4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -23,6 +23,7 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.rpc.CallId; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.raftlog.RaftLogIOException; import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.statemachine.SnapshotInfo; @@ -33,6 +34,7 @@ import java.io.InterruptedIOException; import java.util.Comparator; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * The default implementation of {@link LogAppender} @@ -54,7 +56,7 @@ class LogAppenderDefault extends LogAppenderBase { } /** Send an appendEntries RPC; retry indefinitely. */ - private AppendEntriesReplyProto sendAppendEntriesWithRetries() + private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestFirstIndex) throws InterruptedException, InterruptedIOException, RaftLogIOException { int retry = 0; @@ -76,12 +78,15 @@ class LogAppenderDefault extends LogAppenderBase { resetHeartbeatTrigger(); final Timestamp sendTime = Timestamp.currentTime(); getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0); - final AppendEntriesReplyProto r = getServerRpc().appendEntries(request); + final AppendEntriesRequestProto proto = request; + final AppendEntriesReplyProto reply = getServerRpc().appendEntries(proto); + final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX; + requestFirstIndex.set(first); getFollower().updateLastRpcResponseTime(); getFollower().updateLastRespondedAppendEntriesSendTime(sendTime); - getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit()); - return r; + getLeaderState().onFollowerCommitIndex(getFollower(), reply.getFollowerCommit()); + return reply; } catch (InterruptedIOException | RaftLogIOException e) { throw e; } catch (IOException ioe) { @@ -153,9 +158,10 @@ class LogAppenderDefault extends LogAppenderBase { } // otherwise if r is null, retry the snapshot installation } else { - final AppendEntriesReplyProto r = sendAppendEntriesWithRetries(); + final AtomicLong requestFirstIndex = new AtomicLong(RaftLog.INVALID_LOG_INDEX); + final AppendEntriesReplyProto r = sendAppendEntriesWithRetries(requestFirstIndex); if (r != null) { - handleReply(r); + handleReply(r, requestFirstIndex.get()); } } } @@ -166,7 +172,8 @@ class LogAppenderDefault extends LogAppenderBase { } } - private void handleReply(AppendEntriesReplyProto reply) throws IllegalArgumentException { + private void handleReply(AppendEntriesReplyProto reply, long requestFirstIndex) + throws IllegalArgumentException { if (reply != null) { switch (reply.getResult()) { case SUCCESS: @@ -189,7 +196,7 @@ class LogAppenderDefault extends LogAppenderBase { onFollowerTerm(reply.getTerm()); break; case INCONSISTENCY: - getFollower().decreaseNextIndex(reply.getNextIndex()); + getFollower().setNextIndex(getNextIndexForInconsistency(requestFirstIndex, reply.getNextIndex())); break; case UNRECOGNIZED: LOG.warn("{}: received {}", this, reply.getResult());
