This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 287a4ac7c RATIS-2137. Fix LogAppenderDefault in handling of
INCONSISTENCY. (#1136)
287a4ac7c is described below
commit 287a4ac7c467c5a5132c0e85bc83923e27a0f064
Author: Flyangz <[email protected]>
AuthorDate: Thu Aug 22 00:10:04 2024 +0800
RATIS-2137. Fix LogAppenderDefault in handling of INCONSISTENCY. (#1136)
---
.../ratis/server/leader/LogAppenderDefault.java | 21 ++++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 432a41992..f75a80f82 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -23,6 +23,7 @@ import
org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -34,6 +35,7 @@ import java.io.InterruptedIOException;
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* The default implementation of {@link LogAppender}
@@ -55,7 +57,7 @@ class LogAppenderDefault extends LogAppenderBase {
}
/** Send an appendEntries RPC; retry indefinitely. */
- private AppendEntriesReplyProto sendAppendEntriesWithRetries()
+ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong
requestFirstIndex)
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
@@ -78,9 +80,12 @@ class LogAppenderDefault extends LogAppenderBase {
return null;
}
- AppendEntriesReplyProto r = sendAppendEntries(request.get());
+ final AppendEntriesRequestProto proto = request.get();
+ final AppendEntriesReplyProto reply = sendAppendEntries(proto);
+ final long first = proto.getEntriesCount() > 0 ?
proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
+ requestFirstIndex.set(first);
request.release();
- return r;
+ return reply;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
} catch (IOException ioe) {
@@ -164,9 +169,10 @@ class LogAppenderDefault extends LogAppenderBase {
}
// otherwise if r is null, retry the snapshot installation
} else {
- final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
+ final AtomicLong requestFirstIndex = new
AtomicLong(RaftLog.INVALID_LOG_INDEX);
+ final AppendEntriesReplyProto r =
sendAppendEntriesWithRetries(requestFirstIndex);
if (r != null) {
- handleReply(r);
+ handleReply(r, requestFirstIndex.get());
}
}
}
@@ -177,7 +183,8 @@ class LogAppenderDefault extends LogAppenderBase {
}
}
- private void handleReply(AppendEntriesReplyProto reply) throws
IllegalArgumentException {
+ private void handleReply(AppendEntriesReplyProto reply, long
requestFirstIndex)
+ throws IllegalArgumentException {
if (reply != null) {
switch (reply.getResult()) {
case SUCCESS:
@@ -200,7 +207,7 @@ class LogAppenderDefault extends LogAppenderBase {
onFollowerTerm(reply.getTerm());
break;
case INCONSISTENCY:
- getFollower().decreaseNextIndex(reply.getNextIndex());
+
getFollower().setNextIndex(getNextIndexForInconsistency(requestFirstIndex,
reply.getNextIndex()));
break;
case UNRECOGNIZED:
LOG.warn("{}: received {}", this, reply.getResult());