This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f80757d  RATIS-457. gRPC timed out append entry reply should update 
follower match index.  Contributed by Lokesh Jain
f80757d is described below

commit f80757d3df3ce15022118095cd5f623b1c89d559
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 30 08:28:37 2019 -0700

    RATIS-457. gRPC timed out append entry reply should update follower match 
index.  Contributed by Lokesh Jain
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 32 +++++++---------------
 ratis-proto/src/main/proto/Raft.proto              |  1 +
 .../apache/ratis/server/impl/RaftServerImpl.java   | 10 +++++--
 .../apache/ratis/server/impl/ServerProtoUtils.java |  5 +++-
 4 files changed, 22 insertions(+), 26 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 0464ff4..3d6581f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -235,40 +235,38 @@ public class GrpcLogAppender extends LogAppender {
      */
     @Override
     public void onNext(AppendEntriesReplyProto reply) {
-      final AppendEntriesRequest request = 
pendingRequests.remove(reply.getServerReply().getCallId());
+      AppendEntriesRequest request = 
pendingRequests.remove(reply.getServerReply().getCallId());
+      if (request != null) {
+        request.stopRequestTimer(); // Update completion time
+      }
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("{}: received {} reply {}, request={}",
             this, firstResponseReceived? "a": "the first",
             ServerProtoUtils.toString(reply), request);
       }
-      request.stopRequestTimer(); // Update completion time
 
       try {
-        onNextImpl(request, reply);
+        onNextImpl(reply);
       } catch(Throwable t) {
         LOG.error("Failed onNext request=" + request
             + ", reply=" + ServerProtoUtils.toString(reply), t);
       }
     }
 
-    private void onNextImpl(AppendEntriesRequest request, 
AppendEntriesReplyProto reply) {
+    private void onNextImpl(AppendEntriesReplyProto reply) {
       // update the last rpc time
       follower.updateLastRpcResponseTime();
 
       if (!firstResponseReceived) {
         firstResponseReceived = true;
       }
-      if (request == null) {
-        // The request is already handled (probably timeout), ignore the reply.
-        LOG.warn("{}: Request not found, ignoring reply: {}", this, 
ServerProtoUtils.toString(reply));
-        return;
-      }
 
       switch (reply.getResult()) {
         case SUCCESS:
           grpcServerMetrics.onRequestSuccess(getFollowerId().toString());
           updateCommitIndex(reply.getFollowerCommit());
-          if (checkAndUpdateMatchIndex(request)) {
+          if (follower.updateMatchIndex(reply.getMatchIndex())) {
             submitEventOnSuccessAppend();
           }
           break;
@@ -299,8 +297,8 @@ public class GrpcLogAppender extends LogAppender {
       }
       GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
       grpcServerMetrics.onRequestRetry(); // Update try counter
-      long callId = GrpcUtil.getCallId(t);
-      resetClient(pendingRequests.remove(callId));
+      AppendEntriesRequest request = 
pendingRequests.remove(GrpcUtil.getCallId(t));
+      resetClient(request);
     }
 
     @Override
@@ -315,10 +313,6 @@ public class GrpcLogAppender extends LogAppender {
     }
   }
 
-  private boolean checkAndUpdateMatchIndex(AppendEntriesRequest request) {
-    return follower.updateMatchIndex(request.getNewMatchIndex());
-  }
-
   private synchronized void updateNextIndex(long replyNextIndex) {
     pendingRequests.clear();
     follower.updateNextIndex(replyNextIndex);
@@ -545,12 +539,6 @@ public class GrpcLogAppender extends LogAppender {
       return previousLog;
     }
 
-    long getNewMatchIndex() {
-      return lastEntry != null? lastEntry.getIndex()
-          : previousLog != null? previousLog.getIndex()
-          : 0;
-    }
-
     void startRequestTimer() {
       timerContext = timer.time();
     }
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index b75569e..11c6bc5 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -175,6 +175,7 @@ message AppendEntriesReplyProto {
   uint64 nextIndex = 3;
   AppendResult result = 4;
   uint64 followerCommit = 5;
+  uint64 matchIndex = 6;
 }
 
 message InstallSnapshotRequestProto {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index dc6a5a7..90413ea 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -924,7 +924,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
         final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getMemberId(), currentTerm, followerCommit, 
state.getNextIndex(), NOT_LEADER, callId);
+            leaderId, getMemberId(), currentTerm, followerCommit, 
state.getNextIndex(), NOT_LEADER, callId,
+            RaftLog.INVALID_LOG_INDEX);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} 
reply: {}",
               getMemberId(), leaderId, leaderTerm, state, 
ServerProtoUtils.toString(reply));
@@ -974,8 +975,10 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       synchronized(this) {
         state.updateStatemachine(leaderCommit, currentTerm);
         final long n = isHeartbeat? state.getLog().getNextIndex(): 
entries[entries.length - 1].getIndex() + 1;
+        final long matchIndex = entries.length != 0 ? entries[entries.length - 
1].getIndex() :
+                previous != null ? previous.getIndex() : 
RaftLog.INVALID_LOG_INDEX;
         reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, 
getMemberId(), currentTerm,
-            state.getLog().getLastCommittedIndex(), n, SUCCESS, callId);
+            state.getLog().getLastCommittedIndex(), n, SUCCESS, callId, 
matchIndex);
       }
       logAppendEntries(isHeartbeat, () ->
           getMemberId() + ": succeeded to handle AppendEntries. Reply: " + 
ServerProtoUtils.toString(reply));
@@ -992,7 +995,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     }
 
     final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, 
INCONSISTENCY, callId);
+        leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex, 
INCONSISTENCY, callId,
+        RaftLog.INVALID_LOG_INDEX);
     LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(), 
ServerProtoUtils.toString(reply));
     return reply;
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 5c4494e..b9c8ee6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -399,9 +399,11 @@ public interface ServerProtoUtils {
         .build();
   }
 
+  @SuppressWarnings("parameternumber")
   static AppendEntriesReplyProto toAppendEntriesReplyProto(
       RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
-      long followerCommit, long nextIndex, AppendResult result, long callId) {
+      long followerCommit, long nextIndex, AppendResult result, long callId,
+      long matchIndex) {
     RaftRpcReplyProto.Builder rpcReply = toRaftRpcReplyProtoBuilder(
         requestorId, replyId, result == AppendResult.SUCCESS)
         .setCallId(callId);
@@ -409,6 +411,7 @@ public interface ServerProtoUtils {
         .setServerReply(rpcReply)
         .setTerm(term)
         .setNextIndex(nextIndex)
+        .setMatchIndex(matchIndex)
         .setFollowerCommit(followerCommit)
         .setResult(result).build();
   }

Reply via email to