This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b7ffa1ba1 RATIS-1909. Fix Decreasing Next Index When GrpcLogAppender
Reset Client. (#939)
b7ffa1ba1 is described below
commit b7ffa1ba1e3e7cecd9ea687f72425c2ffd5b1c34
Author: Brokenice0415 <[email protected]>
AuthorDate: Thu Oct 19 00:49:59 2023 +0800
RATIS-1909. Fix Decreasing Next Index When GrpcLogAppender Reset Client.
(#939)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 2 +-
.../org/apache/ratis/server/leader/FollowerInfo.java | 5 +++++
.../apache/ratis/server/impl/FollowerInfoImpl.java | 7 +++++++
.../apache/ratis/server/leader/LogAppenderBase.java | 20 ++++++++++++++++++++
4 files changed, 33 insertions(+), 1 deletion(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 5c07d7f6d..5d83259ca 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -143,7 +143,7 @@ public class GrpcLogAppender extends LogAppenderBase {
if (request != null && request.isHeartbeat()) {
return;
}
- getFollower().decreaseNextIndex(nextIndex);
+ getFollower().computeNextIndex(getNextIndexForError(nextIndex));
} catch (IOException ie) {
LOG.warn(this + ": Failed to getClient for " + getFollowerId(), ie);
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index 9d5c891d9..e56374512 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -23,6 +23,8 @@ import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.function.LongUnaryOperator;
+
/**
* Information of a follower, provided the local server is the Leader
*/
@@ -84,6 +86,9 @@ public interface FollowerInfo {
/** Update the nextIndex for this follower. */
void updateNextIndex(long newNextIndex);
+ /** Set the nextIndex for this follower. */
+ void computeNextIndex(LongUnaryOperator op);
+
/** @return the lastRpcResponseTime . */
Timestamp getLastRpcResponseTime();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 9394398ac..7c34c1cb8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.util.Timestamp;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.LongUnaryOperator;
class FollowerInfoImpl implements FollowerInfo {
private final String name;
@@ -135,6 +136,12 @@ class FollowerInfoImpl implements FollowerInfo {
message -> debug("updateNextIndex", message));
}
+ @Override
+ public void computeNextIndex(LongUnaryOperator op) {
+ nextIndex.updateUnconditionally(op,
+ message -> info("computeNextIndex", message));
+ }
+
@Override
public void setSnapshotIndex(long newSnapshotIndex) {
snapshotIndex.setUnconditionally(newSnapshotIndex, this::info);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index c685f2f32..b218261da 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongUnaryOperator;
/**
* An abstract implementation of {@link LogAppender}.
@@ -197,6 +198,25 @@ public abstract class LogAppenderBase implements
LogAppender {
return next;
}
+ protected LongUnaryOperator getNextIndexForError(long newNextIndex) {
+ return oldNextIndex -> {
+ final long m = getFollower().getMatchIndex() + 1;
+ final long n = oldNextIndex <= 0L ? oldNextIndex : Math.min(oldNextIndex
- 1, newNextIndex);
+ if (m > n) {
+ if (m > newNextIndex) {
+ LOG.info("Set nextIndex to matchIndex + 1 (= " + m + ")");
+ }
+ return m;
+ } else if (oldNextIndex <= 0L) {
+ return oldNextIndex; // no change.
+ } else {
+ LOG.info("Decrease nextIndex to " + n);
+ return n;
+ }
+ };
+ }
+
+
@Override
public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat)
throws RaftLogIOException {