This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c4cfd5ece RATIS-1945. Enforce min wait time only for INCONSISTENCY
replies. (#975)
c4cfd5ece is described below
commit c4cfd5eced5d81c17feab82ca76efdac541a0632
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Nov 30 20:06:56 2023 -0800
RATIS-1945. Enforce min wait time only for INCONSISTENCY replies. (#975)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 130 +++++++++++++++------
1 file changed, 95 insertions(+), 35 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b9764e987..7adbc7355 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -37,6 +37,7 @@ import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
@@ -52,7 +53,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -67,11 +67,78 @@ public class GrpcLogAppender extends LogAppenderBase {
return diff == 0? 0: diff > 0? 1: -1;
};
+ enum Event {
+ APPEND_ENTRIES_REPLY,
+ APPEND_ENTRIES_INCONSISTENCY_REPLY,
+ SNAPSHOT_REPLY,
+ COMPLETE,
+ TIMEOUT,
+ ERROR;
+
+ boolean updateFirstReplyReceived(boolean firstReplyReceived) {
+ switch (this) {
+ case APPEND_ENTRIES_REPLY:
+ case APPEND_ENTRIES_INCONSISTENCY_REPLY:
+ case SNAPSHOT_REPLY:
+ case COMPLETE:
+ return true;
+ case ERROR:
+ return false;
+ case TIMEOUT:
+ return firstReplyReceived;
+ default:
+ throw new IllegalStateException("Unexpected event: " + this);
+ }
+ }
+
+ boolean isError() {
+ switch (this) {
+ case APPEND_ENTRIES_INCONSISTENCY_REPLY:
+ case TIMEOUT:
+ case ERROR:
+ return true;
+ case APPEND_ENTRIES_REPLY:
+ case SNAPSHOT_REPLY:
+ case COMPLETE:
+ return false;
+ default:
+ throw new IllegalStateException("Unexpected event: " + this);
+ }
+ }
+ }
+
+ static class ReplyState {
+ private boolean firstReplyReceived = false;
+ private int errorCount = 0;
+
+ synchronized boolean isFirstReplyReceived() {
+ return firstReplyReceived;
+ }
+
+ synchronized int getErrorCount() {
+ return errorCount;
+ }
+
+ int process(AppendResult result) {
+ return process(result == AppendResult.INCONSISTENCY?
Event.APPEND_ENTRIES_INCONSISTENCY_REPLY
+ : Event.APPEND_ENTRIES_REPLY);
+ }
+
+ synchronized int process(Event event) {
+ firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived);
+ if (event.isError()) {
+ errorCount++;
+ } else {
+ errorCount = 0;
+ }
+ return errorCount;
+ }
+ }
+
private final AtomicLong callId = new AtomicLong();
private final RequestMap pendingRequests = new RequestMap();
private final int maxPendingRequestsNum;
- private volatile boolean firstResponseReceived = false;
private final boolean installSnapshotEnabled;
private final TimeDuration requestTimeoutDuration;
@@ -87,7 +154,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private final AutoCloseableReadWriteLock lock;
private final StackTraceElement caller;
private final RetryPolicy errorRetryWaitPolicy;
- private final AtomicInteger errCount = new AtomicInteger(0);
+ private final ReplyState replyState = new ReplyState();
public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState,
FollowerInfo f) {
super(server, leaderState, f);
@@ -121,14 +188,14 @@ public class GrpcLogAppender extends LogAppenderBase {
return getServerRpc().getProxies().getProxy(getFollowerId());
}
- private void resetClient(AppendEntriesRequest request, boolean onError) {
+ private void resetClient(AppendEntriesRequest request, Event event) {
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
getClient().resetConnectBackoff();
if (appendLogRequestObserver != null) {
appendLogRequestObserver.stop();
appendLogRequestObserver = null;
}
- firstResponseReceived = false;
+ final int errorCount = replyState.process(event);
// clear the pending requests queue and reset the next index of follower
pendingRequests.clear();
final FollowerInfo f = getFollower();
@@ -136,9 +203,9 @@ public class GrpcLogAppender extends LogAppenderBase {
.map(AppendEntriesRequest::getPreviousLog)
.map(TermIndex::getIndex)
.orElseGet(f::getMatchIndex);
- if (onError && request == null) {
- LOG.warn("{}: Follower failed and request == null, " +
- " keep nextIndex ({}) unchanged and retry.", this,
f.getNextIndex());
+ if (event.isError() && request == null) {
+ LOG.warn("{}: Follower failed (request=null, errorCount={}); keep
nextIndex ({}) unchanged and retry.",
+ this, errorCount, f.getNextIndex());
return;
}
if (request != null && request.isHeartbeat()) {
@@ -215,7 +282,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
private long errorWaitTimeMs() {
- return errorRetryWaitPolicy.handleAttemptFailure(errCount::get)
+ return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount)
.getSleepTime().toLong(TimeUnit.MILLISECONDS);
}
@@ -235,15 +302,17 @@ public class GrpcLogAppender extends LogAppenderBase {
return pendingRequests.logRequestsSize() > 0;
}
- /**
- * @return true iff not received first response or queue is full.
- */
+ /** @return true iff either (1) queue is full, or (2) queue is non-empty and
not received first response. */
private boolean haveTooManyPendingRequests() {
final int size = pendingRequests.logRequestsSize();
if (size == 0) {
return false;
+ } else if (size >= maxPendingRequestsNum) {
+ return true;
+ } else {
+ // queue is non-empty and non-full
+ return !replyState.isFirstReplyReceived();
}
- return !firstResponseReceived || size >= maxPendingRequestsNum;
}
static class StreamObservers {
@@ -354,8 +423,9 @@ public class GrpcLogAppender extends LogAppenderBase {
private void timeoutAppendRequest(long cid, boolean heartbeat) {
final AppendEntriesRequest pending = pendingRequests.handleTimeout(cid,
heartbeat);
if (pending != null) {
- errCount.incrementAndGet();
- LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ?
"HEARTBEAT" : "", pending);
+ final int errorCount = replyState.process(Event.TIMEOUT);
+ LOG.warn("{}: Timed out {}appendEntries, errorCount={}, request={}",
+ this, heartbeat ? "HEARTBEAT " : "", errorCount, pending);
grpcServerMetrics.onRequestTimeout(getFollowerId().toString(),
heartbeat);
pending.stopRequestTimer();
}
@@ -399,7 +469,7 @@ public class GrpcLogAppender extends LogAppenderBase {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: received {} reply {}, request={}",
- this, firstResponseReceived? "a": "the first",
+ this, replyState.isFirstReplyReceived()? "a": "the first",
ServerStringUtils.toAppendEntriesReplyString(reply), request);
}
@@ -412,11 +482,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
private void onNextImpl(AppendEntriesRequest request,
AppendEntriesReplyProto reply) {
- errCount.set(0);
-
- if (!firstResponseReceived) {
- firstResponseReceived = true;
- }
+ final int errorCount = replyState.process(reply.getResult());
switch (reply.getResult()) {
case SUCCESS:
@@ -436,8 +502,8 @@ public class GrpcLogAppender extends LogAppenderBase {
break;
case INCONSISTENCY:
grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
- LOG.warn("{}: received {} reply with nextIndex {}, request={}",
- this, reply.getResult(), reply.getNextIndex(), request);
+ LOG.warn("{}: received {} reply with nextIndex {}, errorCount={},
request={}",
+ this, reply.getResult(), reply.getNextIndex(), errorCount,
request);
final long requestFirstIndex = request != null?
request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX;
updateNextIndex(getNextIndexForInconsistency(requestFirstIndex,
reply.getNextIndex()));
break;
@@ -457,18 +523,16 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.info("{} is already stopped", GrpcLogAppender.this);
return;
}
- errCount.incrementAndGet();
GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
grpcServerMetrics.onRequestRetry(); // Update try counter
AppendEntriesRequest request =
pendingRequests.remove(GrpcUtil.getCallId(t), GrpcUtil.isHeartbeat(t));
- resetClient(request, true);
+ resetClient(request, Event.ERROR);
}
@Override
public void onCompleted() {
LOG.info("{}: follower responses appendEntries COMPLETED", this);
- errCount.set(0);
- resetClient(null, false);
+ resetClient(null, Event.COMPLETE);
}
@Override
@@ -546,17 +610,14 @@ public class GrpcLogAppender extends LogAppenderBase {
@Override
public void onNext(InstallSnapshotReplyProto reply) {
if (LOG.isInfoEnabled()) {
- LOG.info("{}: received {} reply {}", this, firstResponseReceived ? "a"
: "the first",
+ LOG.info("{}: received {} reply {}", this,
replyState.isFirstReplyReceived()? "a" : "the first",
ServerStringUtils.toInstallSnapshotReplyString(reply));
}
// update the last rpc time
getFollower().updateLastRpcResponseTime();
- errCount.set(0);
+ replyState.process(Event.SNAPSHOT_REPLY);
- if (!firstResponseReceived) {
- firstResponseReceived = true;
- }
final long followerSnapshotIndex;
switch (reply.getResult()) {
case SUCCESS:
@@ -616,10 +677,9 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
- errCount.incrementAndGet();
GrpcUtil.warn(LOG, () -> this + ": Failed InstallSnapshot", t);
grpcServerMetrics.onRequestRetry(); // Update try counter
- resetClient(null, true);
+ resetClient(null, Event.ERROR);
close();
}
@@ -628,7 +688,7 @@ public class GrpcLogAppender extends LogAppenderBase {
if (!isNotificationOnly || LOG.isDebugEnabled()) {
LOG.info("{}: follower responded installSnapshot COMPLETED", this);
}
- errCount.set(0);
+ replyState.process(Event.COMPLETE);
close();
}