Repository: incubator-ratis
Updated Branches:
  refs/heads/master b0dc99205 -> c3845bc3f


RATIS-341. Raft log index on the follower should be applied to state machine 
only after writing the log.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c3845bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c3845bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c3845bc3

Branch: refs/heads/master
Commit: c3845bc3f3044f513347d8fec220e4e289a0ea5b
Parents: b0dc992
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Sun Oct 7 05:34:53 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Sun Oct 7 05:34:53 2018 +0800

----------------------------------------------------------------------
 .../ratis/grpc/server/GrpcLogAppender.java      |  2 +-
 ratis-proto/src/main/proto/Raft.proto           |  1 +
 .../apache/ratis/server/impl/LogAppender.java   |  2 +-
 .../ratis/server/impl/RaftServerImpl.java       | 48 +++++++++-----------
 .../ratis/server/impl/ServerProtoUtils.java     |  6 ++-
 .../apache/ratis/server/storage/RaftLog.java    |  5 +-
 6 files changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 66c9948..11ff4d8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -266,7 +266,7 @@ public class GrpcLogAppender extends LogAppender {
       LOG.warn("{}: Request not found, ignoring reply: {}", this, 
ServerProtoUtils.toString(reply));
       return;
     }
-    updateCommitIndex(request.getLeaderCommit());
+    updateCommitIndex(reply.getFollowerCommit());
 
     final long replyNextIndex = reply.getNextIndex();
     final long lastIndex = replyNextIndex - 1;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-proto/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 535914d..e0916fd 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -147,6 +147,7 @@ message AppendEntriesReplyProto {
   uint64 term = 2;
   uint64 nextIndex = 3;
   AppendResult result = 4;
+  uint64 followerCommit = 5;
 }
 
 message InstallSnapshotRequestProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 3c9b2d4..4dff3e5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -238,7 +238,7 @@ public class LogAppender {
         final AppendEntriesReplyProto r = 
server.getServerRpc().appendEntries(request);
         follower.updateLastRpcResponseTime();
 
-        updateCommitIndex(request.getLeaderCommit());
+        updateCommitIndex(r.getFollowerCommit());
         return r;
       } catch (InterruptedIOException | RaftLogIOException e) {
         throw e;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d4b32a1..5e7bd89 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -895,12 +895,13 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
     final long currentTerm;
     long nextIndex = state.getLog().getNextIndex();
+    long followerCommit = state.getLog().getLastCommittedIndex();
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
         final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), groupId, currentTerm, nextIndex, NOT_LEADER, 
callId);
+            leaderId, getId(), groupId, currentTerm, followerCommit, 
nextIndex, NOT_LEADER, callId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} 
reply: {}",
               getId(), leaderId, leaderTerm, state, 
ProtoUtils.toString(reply));
@@ -924,9 +925,9 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       // last index included in snapshot. This is because indices <= snapshot's
       // last index should have been committed.
       if (previous != null && !containPrevious(previous)) {
-        final AppendEntriesReplyProto reply =
-            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), 
groupId,
-                currentTerm, Math.min(nextIndex, previous.getIndex()), 
INCONSISTENCY, callId);
+        final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+            leaderId, getId(), groupId, currentTerm, followerCommit, 
Math.min(nextIndex, previous.getIndex()),
+            INCONSISTENCY, callId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
               getId(), previous, ServerProtoUtils.toString(reply));
@@ -937,33 +938,28 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       futures = state.getLog().append(entries);
 
       state.updateConfiguration(entries);
-      state.updateStatemachine(leaderCommit, currentTerm);
 
-      commitInfos.stream().forEach(c -> commitInfoCache.update(c));
+      commitInfos.forEach(commitInfoCache::update);
     }
     if (entries.length > 0) {
       CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
-      nextIndex = entries[entries.length - 1].getIndex() + 1;
     }
-    final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-        leaderId, getId(), groupId, currentTerm, nextIndex, SUCCESS, callId);
-    logAppendEntries(isHeartbeat,
-        () -> getId() + ": succeeded to handle AppendEntries. Reply: "
-            + ServerProtoUtils.toString(reply));
-    return JavaUtils.allOf(futures)
-        .thenApply(v -> {
-          // reset election timer to avoid punishing the leader for our own
-          // long disk writes
-          synchronized (this) {
-            if (lifeCycle.getCurrentState() == RUNNING && isFollower()
-                && getState().getCurrentTerm() == currentTerm) {
-              // reset election timer to avoid punishing the leader for our own
-              // long disk writes
-              heartbeatMonitor.updateLastRpcTime(false);
-            }
-          }
-          return reply;
-        });
+    return JavaUtils.allOf(futures).thenApply(v -> {
+      final AppendEntriesReplyProto reply;
+      synchronized(this) {
+        if (lifeCycle.getCurrentState() == RUNNING && isFollower()
+            && getState().getCurrentTerm() == currentTerm) {
+          // reset election timer to avoid punishing the leader for our own 
long disk writes
+          heartbeatMonitor.updateLastRpcTime(false);
+        }
+        state.updateStatemachine(leaderCommit, currentTerm);
+        reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(), 
groupId, currentTerm,
+            state.getLog().getLastCommittedIndex(), 
state.getLog().getNextIndex(), SUCCESS, callId);
+      }
+      logAppendEntries(isHeartbeat, () ->
+          getId() + ": succeeded to handle AppendEntries. Reply: " + 
ServerProtoUtils.toString(reply));
+      return reply;
+    });
   }
 
   private boolean containPrevious(TermIndex previous) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 518db7c..53df265 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -76,7 +76,8 @@ public class ServerProtoUtils {
 
   public static String toString(AppendEntriesReplyProto reply) {
     return toString(reply.getServerReply()) + "," + reply.getResult()
-        + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm();
+        + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm()
+        + ",followerCommit" + reply.getFollowerCommit();
   }
 
   private static String toString(RaftRpcReplyProto reply) {
@@ -175,7 +176,7 @@ public class ServerProtoUtils {
 
   public static AppendEntriesReplyProto toAppendEntriesReplyProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
term,
-      long nextIndex, AppendResult result, long callId) {
+      long followerCommit, long nextIndex, AppendResult result, long callId) {
     RaftRpcReplyProto.Builder rpcReply = toRaftRpcReplyProtoBuilder(
         requestorId, replyId, groupId, result == AppendResult.SUCCESS)
         .setCallId(callId);
@@ -183,6 +184,7 @@ public class ServerProtoUtils {
         .setServerReply(rpcReply)
         .setTerm(term)
         .setNextIndex(nextIndex)
+        .setFollowerCommit(followerCommit)
         .setResult(result).build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3845bc3/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 2adef40..b5a38fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -94,8 +94,9 @@ public abstract class RaftLog implements Closeable {
         // paper for details.
         final TermIndex entry = getTermIndex(majorityIndex);
         if (entry != null && entry.getTerm() == currentTerm) {
-          LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex);
-          lastCommitted.set(majorityIndex);
+          final long commitIndex = Math.min(majorityIndex, 
getLatestFlushedIndex());
+          LOG.debug("{}: Updating lastCommitted to {}", selfId, commitIndex);
+          lastCommitted.set(commitIndex);
           return true;
         }
       }

Reply via email to