Repository: incubator-ratis
Updated Branches:
  refs/heads/master 3e69b13a2 -> 4104860b9


RATIS-205. Return commit information to client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4104860b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4104860b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4104860b

Branch: refs/heads/master
Commit: 4104860b9a1b79b997df4e39c3c70c7008422da4
Parents: 3e69b13
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Thu Feb 15 14:49:00 2018 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Thu Feb 15 14:49:00 2018 +0800

----------------------------------------------------------------------
 .../ratis/client/impl/ClientProtoUtils.java     | 10 ++-
 .../java/org/apache/ratis/protocol/Message.java |  4 +
 .../apache/ratis/protocol/RaftClientReply.java  | 46 +++++++---
 .../org/apache/ratis/protocol/RaftGroup.java    | 31 ++++---
 .../ratis/protocol/ServerInformationReply.java  | 30 +++----
 .../java/org/apache/ratis/util/ProtoUtils.java  | 23 +++++
 .../ratis/grpc/server/GRpcLogAppender.java      |  2 +
 ratis-proto-shaded/src/main/proto/Raft.proto    | 20 +++--
 .../ratis/server/impl/CommitInfoCache.java      | 52 +++++++++++
 .../apache/ratis/server/impl/FollowerInfo.java  | 21 ++++-
 .../apache/ratis/server/impl/LeaderState.java   | 15 ++++
 .../apache/ratis/server/impl/LogAppender.java   | 20 +++--
 .../ratis/server/impl/PendingRequest.java       |  6 +-
 .../ratis/server/impl/PendingRequests.java      |  2 +-
 .../ratis/server/impl/RaftServerImpl.java       | 90 ++++++++++++--------
 .../ratis/server/impl/RaftServerProxy.java      | 26 ++----
 .../ratis/server/impl/ServerProtoUtils.java     |  9 +-
 .../apache/ratis/server/impl/ServerState.java   | 37 ++++----
 .../apache/ratis/server/storage/RaftLog.java    |  5 +-
 .../org/apache/ratis/RaftExceptionBaseTest.java |  2 +-
 .../server/impl/ServerInformationBaseTest.java  | 74 ++++++++++++++--
 21 files changed, 374 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 861dba6..9148633 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -22,7 +22,6 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.ReflectionUtils;
-import org.apache.ratis.util.StringUtils;
 
 import java.util.Arrays;
 
@@ -109,6 +108,7 @@ public interface ClientProtoUtils {
       if (reply.getMessage() != null) {
         b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
       }
+      ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> 
b.addCommitInfos(i));
 
       final NotLeaderException nle = reply.getNotLeaderException();
       final StateMachineException sme;
@@ -146,6 +146,7 @@ public interface ClientProtoUtils {
       if (reply.getRaftGroupId() != null) {
         b.setGroup(ProtoUtils.toRaftGroupProtoBuilder(reply.getGroup()));
       }
+      ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> 
b.addCommitInfos(i));
     }
     return b.build();
   }
@@ -172,7 +173,8 @@ public interface ClientProtoUtils {
     final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
     return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
         groupId, rp.getCallId(), rp.getSuccess(),
-        toMessage(replyProto.getMessage()), e);
+        toMessage(replyProto.getMessage()), e,
+        replyProto.getCommitInfosList());
   }
 
   static ServerInformationReply toServerInformationReply(
@@ -182,8 +184,8 @@ public interface ClientProtoUtils {
     final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
     final RaftGroup raftGroup = ProtoUtils.toRaftGroup(replyProto.getGroup());
     return new ServerInformationReply(clientId, 
RaftPeerId.valueOf(rp.getReplyId()),
-        groupId, rp.getCallId(), rp.getSuccess(), null,
-        null, raftGroup);
+        groupId, rp.getCallId(), rp.getSuccess(),
+        replyProto.getCommitInfosList(), raftGroup);
   }
 
   static StateMachineException wrapStateMachineException(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index 4efd29c..9e8198e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -47,6 +47,10 @@ public interface Message {
     return valueOf(bytes, () -> "Message:" + 
StringUtils.bytes2HexShortString(bytes));
   }
 
+  static Message valueOf(String string) {
+    return valueOf(ByteString.copyFromUtf8(string), () -> "Message:" + string);
+  }
+
   Message EMPTY = valueOf(ByteString.EMPTY);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 77a987d..af64d66 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -17,10 +17,16 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.ReflectionUtils;
 
+import java.util.Collection;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
 /**
  * Reply from server to client
  */
@@ -36,9 +42,13 @@ public class RaftClientReply extends RaftClientMessage {
   private final RaftException exception;
   private final Message message;
 
-  public RaftClientReply(ClientId clientId, RaftPeerId serverId,
-      RaftGroupId groupId, long callId, boolean success, Message message,
-      RaftException exception) {
+  /** The commit information when the reply is created. */
+  private final Collection<CommitInfoProto> commitInfos;
+
+  public RaftClientReply(
+      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+      long callId, boolean success, Message message, RaftException exception,
+      Collection<CommitInfoProto> commitInfos) {
     super(clientId, serverId, groupId);
     this.success = success;
     this.callId = callId;
@@ -52,17 +62,32 @@ public class RaftClientReply extends RaftClientMessage {
           ReflectionUtils.isInstance(exception, NotLeaderException.class, 
StateMachineException.class),
           () -> "Unexpected exception class: " + this);
     }
+
+    this.commitInfos = commitInfos;
   }
 
-  public RaftClientReply(RaftClientRequest request,
-      RaftException exception) {
+  public RaftClientReply(RaftClientRequest request, RaftException exception, 
Collection<CommitInfoProto> commitInfos) {
     this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
-        request.getCallId(), false, null, exception);
+        request.getCallId(), false, null, exception, commitInfos);
+  }
+
+  public RaftClientReply(RaftClientRequest request, 
Collection<CommitInfoProto> commitInfos) {
+    this(request, (Message) null, commitInfos);
   }
 
-  public RaftClientReply(RaftClientRequest request, Message message) {
+  public RaftClientReply(RaftClientRequest request, Message message, 
Collection<CommitInfoProto> commitInfos) {
     this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
-        request.getCallId(), true, message, null);
+        request.getCallId(), true, message, null, commitInfos);
+  }
+
+  /**
+   * Get the commit information for the entire group.
+   * The commit information may be unavailable for exception reply.
+   *
+   * @return the commit information if it is available; otherwise, return null.
+   */
+  public Collection<CommitInfoProto> getCommitInfos() {
+    return commitInfos;
   }
 
   @Override
@@ -76,8 +101,9 @@ public class RaftClientReply extends RaftClientMessage {
 
   @Override
   public String toString() {
-    return super.toString() + ", cid=" + getCallId()
-        + ", success? " + isSuccess() + ", exception=" + exception;
+    return super.toString() + ", cid=" + getCallId() + ", "
+        + (isSuccess()? "SUCCESS":  "FAILED " + exception)
+        + ", commits" + ProtoUtils.toString(commitInfos);
   }
 
   public boolean isSuccess() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index f4a16c4..c119e32 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -17,8 +17,6 @@
  */
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.util.Preconditions;
-
 import java.util.*;
 
 /**
@@ -26,7 +24,7 @@ import java.util.*;
  * peers.
  */
 public class RaftGroup {
-  private static RaftGroup EMPTY_GROUP = new 
RaftGroup(RaftGroupId.emptyGroupId(), Collections.emptyList());
+  private static RaftGroup EMPTY_GROUP = new 
RaftGroup(RaftGroupId.emptyGroupId());
 
   public static RaftGroup emptyGroup() {
     return EMPTY_GROUP;
@@ -35,32 +33,43 @@ public class RaftGroup {
   /** UTF-8 string as id */
   private final RaftGroupId groupId;
   /** The group of raft peers */
-  private final List<RaftPeer> peers;
+  private final Map<RaftPeerId, RaftPeer> peers;
 
   public RaftGroup(RaftGroupId groupId) {
     this(groupId, Collections.emptyList());
   }
 
-  public RaftGroup(RaftGroupId groupId, RaftPeer[] peers) {
+  public RaftGroup(RaftGroupId groupId, RaftPeer... peers) {
     this(groupId, Arrays.asList(peers));
   }
 
   public RaftGroup(RaftGroupId groupId, Collection<RaftPeer> peers) {
-    Preconditions.assertTrue(peers != null);
-    this.groupId = groupId;
-    this.peers = Collections.unmodifiableList(new ArrayList<>(peers));
+    this.groupId = Objects.requireNonNull(groupId, "groupId == null");
+
+    if (peers == null || peers.isEmpty()) {
+      this.peers = Collections.emptyMap();
+    } else {
+      final Map<RaftPeerId, RaftPeer> map = new HashMap<>();
+      peers.stream().forEach(p -> map.put(p.getId(), p));
+      this.peers = Collections.unmodifiableMap(map);
+    }
   }
 
   public RaftGroupId getGroupId() {
     return groupId;
   }
 
-  public List<RaftPeer> getPeers() {
-    return peers;
+  public Collection<RaftPeer> getPeers() {
+    return peers.values();
+  }
+
+  /** @return the peer with the given id if it is in this group; otherwise, 
return null. */
+  public RaftPeer getPeer(RaftPeerId id) {
+    return peers.get(Objects.requireNonNull(id, "id == null"));
   }
 
   @Override
   public String toString() {
-    return groupId + ":" + peers;
+    return groupId + ":" + peers.values();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
index d4257e1..d06b251 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
@@ -17,34 +17,30 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+
+import java.util.Collection;
+
 /**
  * The response of server information request. Sent from server to client.
- *
- * TODO : currently, only information returned is the info of the group the
- * server belongs to.
  */
 public class ServerInformationReply extends RaftClientReply {
-  RaftGroup group;
+  private final RaftGroup group;
 
-  public ServerInformationReply(RaftClientRequest request, Message message,
-      RaftGroup group) {
-    super(request, message);
+  public ServerInformationReply(
+      RaftClientRequest request, Collection<CommitInfoProto> commitInfos, 
RaftGroup group) {
+    super(request, commitInfos);
     this.group = group;
   }
 
-  public ServerInformationReply(RaftClientRequest request,
-      RaftException ex) {
-    super(request, ex);
+  public ServerInformationReply(
+      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+      long callId, boolean success, Collection<CommitInfoProto> commitInfos, 
RaftGroup group) {
+    super(clientId, serverId, groupId, callId, success, null, null, 
commitInfos);
+    this.group = group;
   }
 
   public RaftGroup getGroup() {
     return group;
   }
-
-  public ServerInformationReply(ClientId clientId, RaftPeerId serverId,
-      RaftGroupId groupId, long callId, boolean success, Message message,
-      RaftException exception, RaftGroup group) {
-    super(clientId, serverId, groupId, callId, success, message, exception);
-    this.group = group;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 37a538d..d3b8fcf 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -29,6 +29,8 @@ import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 public interface ProtoUtils {
   static ByteString writeObject2ByteString(Object obj) {
@@ -124,6 +126,27 @@ public interface ProtoUtils {
         .addAllPeers(toRaftPeerProtos(group.getPeers()));
   }
 
+  static CommitInfoProto toCommitInfoProto(RaftPeer peer, long commitIndex) {
+    return CommitInfoProto.newBuilder()
+        .setServer(toRaftPeerProto(peer))
+        .setCommitIndex(commitIndex)
+        .build();
+  }
+
+  static void addCommitInfos(Collection<CommitInfoProto> commitInfos, 
Consumer<CommitInfoProto> adder) {
+    if (commitInfos != null && !commitInfos.isEmpty()) {
+      commitInfos.stream().forEach(i -> adder.accept(i));
+    }
+  }
+
+  static String toString(CommitInfoProto proto) {
+    return RaftPeerId.valueOf(proto.getServer().getId()) + ":c" + 
proto.getCommitIndex();
+  }
+
+  static String toString(Collection<CommitInfoProto> protos) {
+    return 
protos.stream().map(ProtoUtils::toString).collect(Collectors.toList()).toString();
+  }
+
   static boolean isConfigurationLogEntry(LogEntryProto entry) {
     return entry.getLogEntryBodyCase() ==
         LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index cb13561..19389bd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -263,6 +263,8 @@ public class GRpcLogAppender extends LogAppender {
 
   private void onSuccess(AppendEntriesReplyProto reply) {
     AppendEntriesRequestProto request = pendingRequests.poll();
+    updateCommitIndex(request.getLeaderCommit());
+
     final long replyNextIndex = reply.getNextIndex();
     Objects.requireNonNull(request,
         () -> "Got reply with next index " + replyNextIndex

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto 
b/ratis-proto-shaded/src/main/proto/Raft.proto
index fea6bd0..fa823ab 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -118,6 +118,11 @@ message RequestVoteReplyProto {
   bool shouldShutdown = 3;
 }
 
+message CommitInfoProto {
+  RaftPeerProto server = 1;
+  uint64 commitIndex = 2;
+}
+
 message AppendEntriesRequestProto {
   RaftRpcRequestProto serverRequest = 1;
   uint64 leaderTerm = 2;
@@ -125,6 +130,8 @@ message AppendEntriesRequestProto {
   repeated LogEntryProto entries = 4;
   uint64 leaderCommit = 5;
   bool initializing = 6;
+
+  repeated CommitInfoProto commitInfos = 15;
 }
 
 message AppendEntriesReplyProto {
@@ -189,11 +196,8 @@ message RaftClientReplyProto {
     NotLeaderExceptionProto notLeaderException = 3;
     StateMachineExceptionProto stateMachineException = 4;
   }
-}
 
-message ServerInformationReplyProto {
-  RaftRpcReplyProto rpcReply = 1;
-  RaftGroupProto group = 2;
+  repeated CommitInfoProto commitInfos = 15;
 }
 
 // setConfiguration request
@@ -211,4 +215,10 @@ message ReinitializeRequestProto {
 // server info request
 message ServerInformationRequestProto {
   RaftRpcRequestProto rpcRequest = 1;
-}
\ No newline at end of file
+}
+
+message ServerInformationReplyProto {
+  RaftRpcReplyProto rpcReply = 1;
+  RaftGroupProto group = 2;
+  repeated CommitInfoProto commitInfos = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
new file mode 100644
index 0000000..e74ff9c
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.util.ProtoUtils;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/** Caching the commit information. */
+class CommitInfoCache {
+  private final ConcurrentMap<RaftPeerId, CommitInfoProto> map = new 
ConcurrentHashMap<>();
+
+  CommitInfoProto get(RaftPeerId id) {
+    return map.get(id);
+  }
+
+  CommitInfoProto update(RaftPeer peer, long newCommitIndex) {
+    Objects.requireNonNull(peer, "peer == null");
+    return map.compute(peer.getId(), (id, old) ->
+        old == null || newCommitIndex > old.getCommitIndex()? 
ProtoUtils.toCommitInfoProto(peer, newCommitIndex): old);
+  }
+
+  CommitInfoProto update(CommitInfoProto newInfo) {
+    return map.compute(RaftPeerId.valueOf(newInfo.getServer().getId()),
+        (id, old) -> old == null || newInfo.getCommitIndex() > 
old.getCommitIndex()? newInfo: old);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ":" + map.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index f72e037..246b9df 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -17,18 +17,21 @@
  */
 package org.apache.ratis.server.impl;
 
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.Timestamp;
 
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 public class FollowerInfo {
   private final RaftPeer peer;
   private final AtomicReference<Timestamp> lastRpcResponseTime;
   private final AtomicReference<Timestamp> lastRpcSendTime;
   private long nextIndex;
   private final AtomicLong matchIndex;
+  private final AtomicLong commitIndex = new 
AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
   private volatile boolean attendVote;
 
   FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
@@ -49,6 +52,18 @@ public class FollowerInfo {
     return matchIndex.get();
   }
 
+  /** @return the commit index acked by the follower. */
+  long getCommitIndex() {
+    return commitIndex.get();
+  }
+
+  boolean updateCommitIndex(long newCommitIndex) {
+    final long old = commitIndex.getAndUpdate(oldCommitIndex -> 
newCommitIndex);
+    Preconditions.assertTrue(newCommitIndex >= old,
+        () -> "newCommitIndex = " + newCommitIndex + " < old = " + old);
+    return old != newCommitIndex;
+  }
+
   public synchronized long getNextIndex() {
     return nextIndex;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index a3974d5..840df08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -22,6 +22,8 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.TransactionContext;
@@ -258,6 +260,19 @@ public class LeaderState {
     server.getState().setRaftConf(logIndex, newConf);
   }
 
+  void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> 
protos) {
+    senders.stream().map(LogAppender::getFollower)
+        .map(f -> cache.update(f.getPeer(), f.getCommitIndex()))
+        .forEach(protos::add);
+  }
+
+  AppendEntriesRequestProto newAppendEntriesRequestProto(
+      RaftPeerId targetId, TermIndex previous, List<LogEntryProto> entries, 
boolean initializing) {
+    return ServerProtoUtils.toAppendEntriesRequestProto(server.getId(), 
targetId,
+        server.getGroupId(), currentTerm, entries, 
raftLog.getLastCommittedIndex(),
+        initializing, previous, server.getCommitInfos());
+  }
+
   /**
    * After receiving a setConfiguration request, the leader should update its
    * RpcSender list.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 556ba83..ed4e843 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -57,7 +58,6 @@ public class LogAppender extends Daemon {
   private final int maxBufferSize;
   private final boolean batchSending;
   private final LogEntryBuffer buffer;
-  private final long leaderTerm;
   private final int snapshotChunkMaxSize;
 
   private volatile boolean sending = true;
@@ -74,7 +74,6 @@ public class LogAppender extends Daemon {
     this.snapshotChunkMaxSize = 
RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
 
     this.buffer = new LogEntryBuffer();
-    this.leaderTerm = server.getState().getCurrentTerm();
   }
 
   @Override
@@ -107,6 +106,10 @@ public class LogAppender extends Daemon {
     return follower;
   }
 
+  RaftPeerId getFollowerId() {
+    return getFollower().getPeer().getId();
+  }
+
   /**
    * A buffer for log entries with size limitation.
    */
@@ -135,9 +138,8 @@ public class LogAppender extends Daemon {
     }
 
     AppendEntriesRequestProto getAppendRequest(TermIndex previous) {
-      final AppendEntriesRequestProto request = server
-          .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(),
-              previous, buf, !follower.isAttendingVote());
+      final AppendEntriesRequestProto request = 
leaderState.newAppendEntriesRequestProto(
+          getFollowerId(), previous, buf, !follower.isAttendingVote());
       buf.clear();
       totalSize = 0;
       return request;
@@ -209,10 +211,10 @@ public class LogAppender extends Daemon {
         }
 
         follower.updateLastRpcSendTime();
-        final AppendEntriesReplyProto r = server.getServerRpc()
-            .appendEntries(request);
+        final AppendEntriesReplyProto r = 
server.getServerRpc().appendEntries(request);
         follower.updateLastRpcResponseTime();
 
+        updateCommitIndex(request.getLeaderCommit());
         return r;
       } catch (InterruptedIOException | RaftLogIOException e) {
         throw e;
@@ -227,6 +229,10 @@ public class LogAppender extends Daemon {
     return null;
   }
 
+  protected void updateCommitIndex(long commitIndex) {
+    follower.updateCommitIndex(commitIndex);
+  }
+
   protected class SnapshotRequestIter
       implements Iterable<InstallSnapshotRequestProto> {
     private final SnapshotInfo snapshot;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 95731c5..b63cd01 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -71,14 +71,10 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
   }
 
   TransactionContext setNotLeaderException(NotLeaderException nle) {
-    setReply(new RaftClientReply(getRequest(), nle));
+    setReply(new RaftClientReply(getRequest(), nle, null));
     return getEntry();
   }
 
-  void setSuccessReply(Message message) {
-    setReply(new RaftClientReply(getRequest(), message));
-  }
-
   @Override
   public int compareTo(PendingRequest that) {
     return Long.compare(this.index, that.index);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 1dead9d..c9c66a5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -113,7 +113,7 @@ class PendingRequests {
     if (pendingSetConf != null) {
       // for setConfiguration we do not need to wait for statemachine. send 
back
       // reply after it's committed.
-      pendingSetConf.setSuccessReply(null);
+      pendingSetConf.setReply(new RaftClientReply(pendingSetConf.getRequest(), 
server.getCommitInfos()));
       pendingSetConf = null;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 423837b..37cd16f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -40,12 +40,8 @@ import org.slf4j.LoggerFactory;
 import javax.management.ObjectName;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
+import java.util.concurrent.*;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -89,6 +85,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   private volatile LeaderState leaderState;
 
   private final RetryCache retryCache;
+  private final CommitInfoCache commitInfoCache = new CommitInfoCache();
 
   private final RaftServerJmxAdapter jmxAdapter;
 
@@ -102,7 +99,9 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
         "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
     this.proxy = proxy;
-    this.state = new ServerState(id, group, properties, this, 
proxy.getStateMachine());
+
+    final RaftPeer peer = new RaftPeer(id, 
proxy.getServerRpc().getInetSocketAddress());
+    this.state = new ServerState(peer, group, properties, this, 
proxy.getStateMachine());
     this.retryCache = initRetryCache(properties);
 
     this.jmxAdapter = new RaftServerJmxAdapter();
@@ -340,6 +339,31 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     heartbeatMonitor = null;
   }
 
+  Collection<CommitInfoProto> getCommitInfos() {
+    final List<CommitInfoProto> infos = new ArrayList<>();
+    // add the commit info of this server
+    infos.add(state.updateCommitInfo(commitInfoCache));
+
+    // add the commit infos of other servers
+    if (isLeader()) {
+      Optional.of(leaderState).ifPresent(
+          leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
+    } else {
+      getRaftConf().getPeers().stream()
+          .filter(p -> !p.getId().equals(state.getSelfId()))
+          .map(RaftPeer::getId)
+          .map(commitInfoCache::get)
+          .filter(i -> i != null)
+          .forEach(infos::add);
+    }
+    return infos;
+  }
+
+  ServerInformationReply getServerInformation(ServerInformatonRequest request) 
{
+    final RaftGroup group = new RaftGroup(groupId, getRaftConf().getPeers());
+    return new ServerInformationReply(request, getCommitInfos(), group);
+  }
+
   synchronized void changeToCandidate() {
     Preconditions.assertTrue(isFollower());
     shutdownHeartbeatMonitor();
@@ -368,7 +392,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     if (!isLeader()) {
       NotLeaderException exception = generateNotLeaderException();
-      final RaftClientReply reply = new RaftClientReply(request, exception);
+      final RaftClientReply reply = new RaftClientReply(request, exception, 
getCommitInfos());
       return RetryCache.failWithReply(reply, entry);
     } else if (leaderState == null || !leaderState.isReady()) {
       RetryCache.CacheEntry cacheEntry = retryCache.get(request.getClientId(), 
request.getCallId());
@@ -435,7 +459,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       } catch (StateMachineException e) {
         // the StateMachineException is thrown by the SM in the preAppend 
stage.
         // Return the exception in a RaftClientReply.
-        RaftClientReply exceptionReply = new RaftClientReply(request, e);
+        RaftClientReply exceptionReply = new RaftClientReply(request, e, 
getCommitInfos());
         cacheEntry.failWithReply(exceptionReply);
         return CompletableFuture.completedFuture(exceptionReply);
       }
@@ -463,7 +487,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       // TODO: We might not be the leader anymore by the time this completes.
       // See the RAFT paper section 8 (last part)
       return stateMachine.query(request.getMessage())
-          .thenApply(r -> new RaftClientReply(request, r));
+          .thenApply(r -> new RaftClientReply(request, r, getCommitInfos()));
     }
 
     // query the retry cache
@@ -482,7 +506,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     TransactionContext context = stateMachine.startTransaction(request);
     if (context.getException() != null) {
       RaftClientReply exceptionReply = new RaftClientReply(request,
-          new StateMachineException(getId(), context.getException()));
+          new StateMachineException(getId(), context.getException()), 
getCommitInfos());
       cacheEntry.failWithReply(exceptionReply);
       return CompletableFuture.completedFuture(exceptionReply);
     }
@@ -495,15 +519,15 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     return waitForReply(getId(), request, submitClientRequestAsync(request));
   }
 
-  static RaftClientReply waitForReply(RaftPeerId id,
+  RaftClientReply waitForReply(RaftPeerId id,
       RaftClientRequest request, CompletableFuture<RaftClientReply> future)
       throws IOException {
-    return waitForReply(id, request, future, RaftClientReply::new);
+    return waitForReply(id, request, future, e -> new RaftClientReply(request, 
e, getCommitInfos()));
   }
 
   static <REPLY extends RaftClientReply> REPLY waitForReply(
       RaftPeerId id, RaftClientRequest request, CompletableFuture<REPLY> 
future,
-      BiFunction<RaftClientRequest, RaftException, REPLY> exceptionReply)
+      Function<RaftException, REPLY> exceptionReply)
       throws IOException {
     try {
       return future.get();
@@ -518,10 +542,12 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       }
       if (cause instanceof NotLeaderException ||
           cause instanceof StateMachineException) {
-        return exceptionReply.apply(request, (RaftException) cause);
-      } else {
-        throw IOUtils.asIOException(cause);
+        final REPLY reply = exceptionReply.apply((RaftException) cause);
+        if (reply != null) {
+          return reply;
+        }
       }
+      throw IOUtils.asIOException(cause);
     }
   }
 
@@ -556,8 +582,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
       final RaftConfiguration current = getRaftConf();
       // make sure there is no other raft reconfiguration in progress
-      if (!current.isStable() || leaderState.inStagingState() ||
-          !state.isCurrentConfCommitted()) {
+      if (!current.isStable() || leaderState.inStagingState() || 
!state.isConfCommitted()) {
         throw new ReconfigurationInProgressException(
             "Reconfiguration is already in progress: " + current);
       }
@@ -565,7 +590,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       // return success with a null message if the new conf is the same as the 
current
       if (current.hasNoChange(peersInNewConf)) {
         pending = new PendingRequest(request);
-        pending.setSuccessReply(null);
+        pending.setReply(new RaftClientReply(request, getCommitInfos()));
         return pending.getFuture();
       }
 
@@ -709,7 +734,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     return appendEntriesAsync(RaftPeerId.valueOf(request.getRequestorId()),
         ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
         r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
-        entries);
+        r.getCommitInfosList(), entries);
   }
 
   static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
@@ -727,14 +752,15 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
       TermIndex previous, long leaderCommit, boolean initializing,
-      LogEntryProto... entries) throws IOException {
+      List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws 
IOException {
     CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
         leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
     final boolean isHeartbeat = entries.length == 0;
     logAppendEntries(isHeartbeat,
         () -> getId() + ": receive appendEntries(" + leaderId + ", " + 
leaderGroupId + ", "
-            + leaderTerm + ", " + previous + ", " + leaderCommit + ", "
-            + initializing + ServerProtoUtils.toString(entries));
+            + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + 
initializing
+            + ", commits" + ProtoUtils.toString(commitInfos)
+            + ", entries: " + ServerProtoUtils.toString(entries));
 
     assertLifeCycleState(STARTING, RUNNING);
     assertGroup(leaderId, leaderGroupId);
@@ -792,6 +818,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
       state.updateConfiguration(entries);
       state.updateStatemachine(leaderCommit, currentTerm);
+
+      commitInfos.stream().forEach(c -> commitInfoCache.update(c));
     }
     if (entries.length > 0) {
       CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
@@ -894,14 +922,6 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
   }
 
-  AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
-      RaftPeerId targetId, TermIndex previous, List<LogEntryProto> entries,
-      boolean initializing) {
-    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, 
groupId,
-        leaderTerm, entries, state.getLog().getLastCommittedIndex(),
-        initializing, previous);
-  }
-
   synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
       RaftPeerId targetId, String requestId, int requestIndex,
       SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) {
@@ -947,12 +967,12 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     return stateMachineFuture.whenComplete((reply, exception) -> {
       final RaftClientReply r;
       if (exception == null) {
-        r = new RaftClientReply(clientId, serverId, groupId, callId, true, 
reply, null);
+        r = new RaftClientReply(clientId, serverId, groupId, callId, true, 
reply, null, getCommitInfos());
       } else {
         // the exception is coming from the state machine. wrap it into the
         // reply as a StateMachineException
         final StateMachineException e = new StateMachineException(getId(), 
exception);
-        r = new RaftClientReply(clientId, serverId, groupId, callId, false, 
null, e);
+        r = new RaftClientReply(clientId, serverId, groupId, callId, false, 
null, e, getCommitInfos());
       }
       // update retry cache
       cacheEntry.updateResult(r);
@@ -1011,7 +1031,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, 
logEntry.getCallId());
       if (cacheEntry != null) {
         final RaftClientReply reply = new RaftClientReply(clientId, getId(), 
getGroupId(),
-            logEntry.getCallId(), false, null, generateNotLeaderException());
+            logEntry.getCallId(), false, null, generateNotLeaderException(), 
getCommitInfos());
         cacheEntry.failWithReply(reply);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 071f65c..760cf70 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -161,7 +160,8 @@ public class RaftServerProxy implements RaftServer {
 
   @Override
   public RaftClientReply reinitialize(ReinitializeRequest request) throws 
IOException {
-    return RaftServerImpl.waitForReply(getId(), request, 
reinitializeAsync(request));
+    return RaftServerImpl.waitForReply(getId(), request, 
reinitializeAsync(request),
+        e -> new RaftClientReply(request, e, null));
   }
 
   @Override
@@ -187,13 +187,13 @@ public class RaftServerProxy implements RaftServer {
               "Failed to reinitialize, request=" + request, ioe);
           impl.completeExceptionally(new IOException(
               "Server " + getId() + " is not initialized.", re));
-          return new RaftClientReply(request, re);
+          return new RaftClientReply(request, re, null);
         }
 
         getServerRpc().addPeers(request.getGroup().getPeers());
         newImpl.start();
         impl.complete(newImpl);
-        return new RaftClientReply(request, (Message) null);
+        return new RaftClientReply(request, newImpl.getCommitInfos());
       } finally {
         reinitializeRequest.set(null);
       }
@@ -204,25 +204,13 @@ public class RaftServerProxy implements RaftServer {
   public ServerInformationReply getInfo(ServerInformatonRequest request)
       throws IOException {
     return RaftServerImpl.waitForReply(getId(), request, getInfoAsync(request),
-        ServerInformationReply::new);
+        r -> null);
   }
 
   @Override
   public CompletableFuture<ServerInformationReply> getInfoAsync(
-      ServerInformatonRequest request) throws IOException {
-    return CompletableFuture.supplyAsync(() -> {
-      try {
-        RaftServerImpl server = impl.get();
-        Collection<RaftPeer> peers = server.getRaftConf().getPeers();
-        RaftGroupId groupId = server.getGroupId();
-        RaftGroup group = new RaftGroup(groupId, peers);
-        return new ServerInformationReply(request, null, group);
-      } catch (Exception e) {
-        final RaftException re = new RaftException(
-            "Failed to get info, request=" + request, e);
-        return new ServerInformationReply(request, re);
-      }
-    });
+      ServerInformatonRequest request) {
+    return impl.thenApply(server -> server.getServerInformation(request));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 3e44c76..cfdf4cf 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -21,20 +21,18 @@ import static 
org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*;
 import org.apache.ratis.util.ProtoUtils;
 
-
 /** Server proto utilities for internal use. */
 public class ServerProtoUtils {
   public static TermIndex toTermIndex(TermIndexProto p) {
@@ -187,7 +185,7 @@ public class ServerProtoUtils {
   public static AppendEntriesRequestProto toAppendEntriesRequestProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long 
leaderTerm,
       List<LogEntryProto> entries, long leaderCommit, boolean initializing,
-      TermIndex previous) {
+      TermIndex previous, Collection<CommitInfoProto> commitInfos) {
     final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
         .newBuilder()
         .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, 
groupId))
@@ -201,6 +199,7 @@ public class ServerProtoUtils {
     if (previous != null) {
       b.setPreviousLog(toTermIndexProto(previous));
     }
+    ProtoUtils.addCommitInfos(commitInfos, i -> b.addCommitInfos(i));
     return b.build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index b50393e..6354e73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -18,13 +18,11 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.*;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -46,6 +44,7 @@ import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBod
  */
 public class ServerState implements Closeable {
   private final RaftPeerId selfId;
+  private final RaftPeer peer;
   private final RaftServerImpl server;
   /** Raft log */
   private final RaftLog log;
@@ -79,10 +78,11 @@ public class ServerState implements Closeable {
    */
   private TermIndex latestInstalledSnapshot;
 
-  ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
+  ServerState(RaftPeer peer, RaftGroup group, RaftProperties prop,
               RaftServerImpl server, StateMachine stateMachine)
       throws IOException {
-    this.selfId = id;
+    this.selfId = peer.getId();
+    this.peer = peer;
     this.server = server;
     RaftConfiguration initialConf = RaftConfiguration.newBuilder()
         .setConf(group.getPeers()).build();
@@ -91,14 +91,14 @@ public class ServerState implements Closeable {
     final File dir = RaftServerConfigKeys.storageDir(prop);
     storage = new RaftStorage(new File(dir, group.getGroupId().toString()),
         RaftServerConstants.StartupOption.REGULAR);
-    snapshotManager = new SnapshotManager(storage, id);
+    snapshotManager = new SnapshotManager(storage, peer.getId());
 
     long lastApplied = initStatemachine(stateMachine, prop);
 
     leaderId = null;
     // we cannot apply log entries to the state machine in this step, since we
     // do not know whether the local log entries have been committed.
-    log = initLog(id, prop, lastApplied, entry -> {
+    log = initLog(peer.getId(), prop, lastApplied, entry -> {
       if (entry.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
         configurationManager.addConfiguration(entry.getIndex(),
             ServerProtoUtils.toRaftConfiguration(entry.getIndex(),
@@ -163,6 +163,10 @@ public class ServerState implements Closeable {
     return this.selfId;
   }
 
+  CommitInfoProto updateCommitInfo(CommitInfoCache cache) {
+    return cache.update(peer, log.getLastCommittedIndex());
+  }
+
   public long getCurrentTerm() {
     return currentTerm;
   }
@@ -304,15 +308,16 @@ public class ServerState implements Closeable {
     }
   }
 
-  void updateStatemachine(long majorityIndex, long currentTerm) {
-    log.updateLastCommitted(majorityIndex, currentTerm);
-    stateMachineUpdater.notifyUpdater();
+  boolean updateStatemachine(long majorityIndex, long currentTerm) {
+    if (log.updateLastCommitted(majorityIndex, currentTerm)) {
+      stateMachineUpdater.notifyUpdater();
+      return true;
+    }
+    return false;
   }
 
-  void reloadStateMachine(long lastIndexInSnapshot, long currentTerm)
-      throws IOException {
+  void reloadStateMachine(long lastIndexInSnapshot, long currentTerm) {
     log.updateLastCommitted(lastIndexInSnapshot, currentTerm);
-
     stateMachineUpdater.reloadStateMachine();
   }
 
@@ -351,8 +356,4 @@ public class ServerState implements Closeable {
   public long getLastAppliedIndex() {
     return stateMachineUpdater.getLastAppliedIndex();
   }
-
-  boolean isCurrentConfCommitted() {
-    return getRaftConf().getLogEntryIndex() <= 
getLog().getLastCommittedIndex();
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 8edb8a1..91f3b41 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -80,8 +80,9 @@ public abstract class RaftLog implements Closeable {
    * Update the last committed index.
    * @param majorityIndex the index that has achieved majority.
    * @param currentTerm the current term.
+   * @return true if update is applied; otherwise, return false, i.e. no 
update required.
    */
-  public void updateLastCommitted(long majorityIndex, long currentTerm) {
+  public boolean updateLastCommitted(long majorityIndex, long currentTerm) {
     try(AutoCloseableLock writeLock = writeLock()) {
       if (lastCommitted.get() < majorityIndex) {
         // Only update last committed index for current term. See §5.4.2 in
@@ -90,9 +91,11 @@ public abstract class RaftLog implements Closeable {
         if (entry != null && entry.getTerm() == currentTerm) {
           LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex);
           lastCommitted.set(majorityIndex);
+          return true;
         }
       }
     }
+    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index e69f35d..f90012e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -187,7 +187,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends 
MiniRaftCluster>
           GroupMismatchException.class);
 
       testFailureCase("reinitialize(..) with client group being different from 
the server group",
-          () -> client.reinitialize(anotherGroup, 
clusterGroup.getPeers().get(0).getId()),
+          () -> client.reinitialize(anotherGroup, 
clusterGroup.getPeers().iterator().next().getId()),
           GroupMismatchException.class);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
index c22d4c5..2862e0d 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
@@ -20,17 +20,17 @@ package org.apache.ratis.server.impl;
 import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.ServerInformationReply;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.util.LogUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
+
 import static org.apache.ratis.util.Preconditions.assertTrue;
 
 public abstract class ServerInformationBaseTest<CLUSTER extends 
MiniRaftCluster>
@@ -43,7 +43,7 @@ public abstract class ServerInformationBaseTest<CLUSTER 
extends MiniRaftCluster>
 
   @Test
   public void testServerInformation() throws Exception {
-    runTest(5);
+    runTest(3);
   }
 
   private void runTest(int num) throws Exception {
@@ -66,6 +66,62 @@ public abstract class ServerInformationBaseTest<CLUSTER 
extends MiniRaftCluster>
         assertTrue(sameGroup(group, info.getGroup()));
       }
     }
+
+    final int numMessages = 5;
+    final long maxCommit;
+    {
+      // send some messages and get max commit from the last reply
+      final RaftClientReply reply = sendMessages(numMessages, cluster);
+      maxCommit = 
reply.getCommitInfos().stream().mapToLong(CommitInfoProto::getCommitIndex).max().getAsLong();
+    }
+    // kill a follower
+    final RaftPeerId killedFollower = 
cluster.getFollowers().iterator().next().getId();
+    cluster.killServer(killedFollower);
+    {
+      // send more messages and check last reply
+      final RaftClientReply reply = sendMessages(numMessages, cluster);
+      for(CommitInfoProto i : reply.getCommitInfos()) {
+        if (RaftPeerId.valueOf(i.getServer().getId()).equals(killedFollower)) {
+          Assert.assertTrue(i.getCommitIndex() <= maxCommit);
+        } else {
+          Assert.assertTrue(i.getCommitIndex() > maxCommit);
+        }
+      }
+    }
+
+    // check serverInformation
+    for(RaftPeer peer : peers) {
+      if (peer.getId().equals(killedFollower)) {
+        continue;
+      }
+      try(final RaftClient client = cluster.createClient(peer.getId())) {
+        RaftClientReply reply = client.serverInformation(peer.getId());
+        assertTrue(reply instanceof ServerInformationReply);
+        ServerInformationReply info = (ServerInformationReply)reply;
+        assertTrue(sameGroup(group, info.getGroup()));
+        for(CommitInfoProto i : info.getCommitInfos()) {
+          if 
(RaftPeerId.valueOf(i.getServer().getId()).equals(killedFollower)) {
+            Assert.assertTrue(i.getCommitIndex() <= maxCommit);
+          } else {
+            Assert.assertTrue(i.getCommitIndex() > maxCommit);
+          }
+        }
+      }
+    }
+
+    cluster.shutdown();
+  }
+
+  RaftClientReply sendMessages(int n, MiniRaftCluster cluster) throws 
Exception {
+    LOG.info("sendMessages: " + n);
+    final RaftPeerId leader = RaftTestUtil.waitForLeader(cluster).getId();
+    RaftClientReply reply = null;
+    try(final RaftClient client = cluster.createClient(leader)) {
+      for(int i = 0; i < n; i++) {
+        reply = client.send(Message.valueOf("m" + i));
+      }
+    }
+    return reply;
   }
 
   private boolean sameGroup(RaftGroup expected, RaftGroup given) {
@@ -73,8 +129,8 @@ public abstract class ServerInformationBaseTest<CLUSTER 
extends MiniRaftCluster>
         given.getGroupId().toString())) {
       return false;
     }
-    List<RaftPeer> expectedPeers = expected.getPeers();
-    List<RaftPeer> givenPeers = given.getPeers();
+    Collection<RaftPeer> expectedPeers = expected.getPeers();
+    Collection<RaftPeer> givenPeers = given.getPeers();
     if (expectedPeers.size() != givenPeers.size()) {
       return false;
     }

Reply via email to