This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f80757d RATIS-457. gRPC timed out append entry reply should update
follower match index. Contributed by Lokesh Jain
f80757d is described below
commit f80757d3df3ce15022118095cd5f623b1c89d559
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 30 08:28:37 2019 -0700
RATIS-457. gRPC timed out append entry reply should update follower match
index. Contributed by Lokesh Jain
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 32 +++++++---------------
ratis-proto/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/impl/RaftServerImpl.java | 10 +++++--
.../apache/ratis/server/impl/ServerProtoUtils.java | 5 +++-
4 files changed, 22 insertions(+), 26 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 0464ff4..3d6581f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -235,40 +235,38 @@ public class GrpcLogAppender extends LogAppender {
*/
@Override
public void onNext(AppendEntriesReplyProto reply) {
- final AppendEntriesRequest request =
pendingRequests.remove(reply.getServerReply().getCallId());
+ AppendEntriesRequest request =
pendingRequests.remove(reply.getServerReply().getCallId());
+ if (request != null) {
+ request.stopRequestTimer(); // Update completion time
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("{}: received {} reply {}, request={}",
this, firstResponseReceived? "a": "the first",
ServerProtoUtils.toString(reply), request);
}
- request.stopRequestTimer(); // Update completion time
try {
- onNextImpl(request, reply);
+ onNextImpl(reply);
} catch(Throwable t) {
LOG.error("Failed onNext request=" + request
+ ", reply=" + ServerProtoUtils.toString(reply), t);
}
}
- private void onNextImpl(AppendEntriesRequest request,
AppendEntriesReplyProto reply) {
+ private void onNextImpl(AppendEntriesReplyProto reply) {
// update the last rpc time
follower.updateLastRpcResponseTime();
if (!firstResponseReceived) {
firstResponseReceived = true;
}
- if (request == null) {
- // The request is already handled (probably timeout), ignore the reply.
- LOG.warn("{}: Request not found, ignoring reply: {}", this,
ServerProtoUtils.toString(reply));
- return;
- }
switch (reply.getResult()) {
case SUCCESS:
grpcServerMetrics.onRequestSuccess(getFollowerId().toString());
updateCommitIndex(reply.getFollowerCommit());
- if (checkAndUpdateMatchIndex(request)) {
+ if (follower.updateMatchIndex(reply.getMatchIndex())) {
submitEventOnSuccessAppend();
}
break;
@@ -299,8 +297,8 @@ public class GrpcLogAppender extends LogAppender {
}
GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
grpcServerMetrics.onRequestRetry(); // Update try counter
- long callId = GrpcUtil.getCallId(t);
- resetClient(pendingRequests.remove(callId));
+ AppendEntriesRequest request =
pendingRequests.remove(GrpcUtil.getCallId(t));
+ resetClient(request);
}
@Override
@@ -315,10 +313,6 @@ public class GrpcLogAppender extends LogAppender {
}
}
- private boolean checkAndUpdateMatchIndex(AppendEntriesRequest request) {
- return follower.updateMatchIndex(request.getNewMatchIndex());
- }
-
private synchronized void updateNextIndex(long replyNextIndex) {
pendingRequests.clear();
follower.updateNextIndex(replyNextIndex);
@@ -545,12 +539,6 @@ public class GrpcLogAppender extends LogAppender {
return previousLog;
}
- long getNewMatchIndex() {
- return lastEntry != null? lastEntry.getIndex()
- : previousLog != null? previousLog.getIndex()
- : 0;
- }
-
void startRequestTimer() {
timerContext = timer.time();
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index b75569e..11c6bc5 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -175,6 +175,7 @@ message AppendEntriesReplyProto {
uint64 nextIndex = 3;
AppendResult result = 4;
uint64 followerCommit = 5;
+ uint64 matchIndex = 6;
}
message InstallSnapshotRequestProto {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index dc6a5a7..90413ea 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -924,7 +924,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
currentTerm = state.getCurrentTerm();
if (!recognized) {
final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getMemberId(), currentTerm, followerCommit,
state.getNextIndex(), NOT_LEADER, callId);
+ leaderId, getMemberId(), currentTerm, followerCommit,
state.getNextIndex(), NOT_LEADER, callId,
+ RaftLog.INVALID_LOG_INDEX);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {}
reply: {}",
getMemberId(), leaderId, leaderTerm, state,
ServerProtoUtils.toString(reply));
@@ -974,8 +975,10 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
synchronized(this) {
state.updateStatemachine(leaderCommit, currentTerm);
final long n = isHeartbeat? state.getLog().getNextIndex():
entries[entries.length - 1].getIndex() + 1;
+ final long matchIndex = entries.length != 0 ? entries[entries.length -
1].getIndex() :
+ previous != null ? previous.getIndex() :
RaftLog.INVALID_LOG_INDEX;
reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId,
getMemberId(), currentTerm,
- state.getLog().getLastCommittedIndex(), n, SUCCESS, callId);
+ state.getLog().getLastCommittedIndex(), n, SUCCESS, callId,
matchIndex);
}
logAppendEntries(isHeartbeat, () ->
getMemberId() + ": succeeded to handle AppendEntries. Reply: " +
ServerProtoUtils.toString(reply));
@@ -992,7 +995,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex,
INCONSISTENCY, callId);
+ leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex,
INCONSISTENCY, callId,
+ RaftLog.INVALID_LOG_INDEX);
LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(),
ServerProtoUtils.toString(reply));
return reply;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 5c4494e..b9c8ee6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -399,9 +399,11 @@ public interface ServerProtoUtils {
.build();
}
+ @SuppressWarnings("parameternumber")
static AppendEntriesReplyProto toAppendEntriesReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
- long followerCommit, long nextIndex, AppendResult result, long callId) {
+ long followerCommit, long nextIndex, AppendResult result, long callId,
+ long matchIndex) {
RaftRpcReplyProto.Builder rpcReply = toRaftRpcReplyProtoBuilder(
requestorId, replyId, result == AppendResult.SUCCESS)
.setCallId(callId);
@@ -409,6 +411,7 @@ public interface ServerProtoUtils {
.setServerReply(rpcReply)
.setTerm(term)
.setNextIndex(nextIndex)
+ .setMatchIndex(matchIndex)
.setFollowerCommit(followerCommit)
.setResult(result).build();
}