This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 21d7526  RATIS-515. IllegalStateException while updating matchIndex. 
Contributed by Tsz Wo Nicholas Sze.
21d7526 is described below

commit 21d7526217795b6477e814365c216c9597c226ee
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue May 14 19:31:11 2019 +0530

    RATIS-515. IllegalStateException while updating matchIndex. Contributed by 
Tsz Wo Nicholas Sze.
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 103 ++++++++-------------
 .../org/apache/ratis/server/impl/FollowerInfo.java |   4 +-
 .../org/apache/ratis/server/impl/LogAppender.java  |  23 ++---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  99 +++++++-------------
 .../apache/ratis/server/impl/ServerProtoUtils.java |  18 ++--
 .../org/apache/ratis/server/impl/ServerState.java  |  41 ++++----
 .../org/apache/ratis/server/storage/RaftLog.java   |   8 +-
 .../statemachine/SimpleStateMachine4Testing.java   |   3 +-
 .../ratis/grpc/TestInstallSnapshotWithGrpc.java    |   2 +-
 9 files changed, 128 insertions(+), 173 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index b8926f1..0403ae9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -84,8 +84,9 @@ public class GrpcLogAppender extends LogAppender {
 
     // clear the pending requests queue and reset the next index of follower
     final long nextIndex = request != null && request.hasPreviousLog()?
-        request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex();
-    clearPendingRequests(nextIndex);
+        request.getPreviousLog().getIndex() + 1: follower.getMatchIndex() + 1;
+    pendingRequests.clear();
+    follower.decreaseNextIndex(nextIndex);
   }
 
   @Override
@@ -222,37 +223,51 @@ public class GrpcLogAppender extends LogAppender {
      */
     @Override
     public void onNext(AppendEntriesReplyProto reply) {
+      final AppendEntriesRequestProto request = 
pendingRequests.remove(reply.getServerReply().getCallId());
       if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: received {} reply {} ", getFollower().getName(),
-            firstResponseReceived? "a": "the first", 
ServerProtoUtils.toString(reply));
+        LOG.debug("{}: received {} reply {}, request={}",
+            follower.getName(), firstResponseReceived? "a": "the first",
+            ServerProtoUtils.toString(reply), 
ServerProtoUtils.toString(request));
       }
 
       try {
-        onNextImpl(reply);
+        onNextImpl(request, reply);
       } catch(Throwable t) {
-        LOG.error("Failed onNext " + reply, t);
+        LOG.error("Failed onNext request=" + ServerProtoUtils.toString(request)
+            + ", reply=" + ServerProtoUtils.toString(reply), t);
       }
     }
 
-    private void onNextImpl(AppendEntriesReplyProto reply) {
+    private void onNextImpl(AppendEntriesRequestProto request, 
AppendEntriesReplyProto reply) {
       // update the last rpc time
       follower.updateLastRpcResponseTime();
 
       if (!firstResponseReceived) {
         firstResponseReceived = true;
       }
+      if (request == null) {
+        // The request is already handled (probably timeout), ignore the reply.
+        LOG.warn("{}: Request not found, ignoring reply: {}", this, 
ServerProtoUtils.toString(reply));
+        return;
+      }
+
       switch (reply.getResult()) {
         case SUCCESS:
-          onSuccess(reply);
+          updateCommitIndex(reply.getFollowerCommit());
+          if (checkAndUpdateMatchIndex(request)) {
+            submitEventOnSuccessAppend();
+          }
           break;
         case NOT_LEADER:
-          onNotLeader(reply);
+          if (checkResponseTerm(reply.getTerm())) {
+            return;
+          }
           break;
         case INCONSISTENCY:
-          onInconsistency(reply);
+          checkAndUpdateNextIndex(request, reply.getNextIndex());
           break;
         default:
-          break;
+          throw new IllegalStateException("Unexpected reply result: " + 
reply.getResult());
       }
       notifyAppend();
     }
@@ -269,7 +284,7 @@ public class GrpcLogAppender extends LogAppender {
       GrpcUtil.warn(LOG, () -> getFollower().getName() + ": Failed 
appendEntries", t);
 
       long callId = GrpcUtil.getCallId(t);
-      resetClient(pendingRequests.get(callId));
+      resetClient(pendingRequests.remove(callId));
     }
 
     @Override
@@ -279,61 +294,19 @@ public class GrpcLogAppender extends LogAppender {
     }
   }
 
-  private void clearPendingRequests(long newNextIndex) {
-    pendingRequests.clear();
-    follower.decreaseNextIndex(newNextIndex);
+  private boolean checkAndUpdateMatchIndex(AppendEntriesRequestProto request) {
+    final int n = request.getEntriesCount();
+    final long newMatchIndex = n == 0? request.getPreviousLog().getIndex(): 
request.getEntries(n - 1).getIndex();
+    return follower.updateMatchIndex(newMatchIndex);
   }
 
-  private synchronized void onSuccess(AppendEntriesReplyProto reply) {
-    AppendEntriesRequestProto request = 
pendingRequests.remove(reply.getServerReply().getCallId());
-    if (request == null) {
-      // If reply comes after timeout, the reply is ignored.
-      LOG.warn("{}: Request not found, ignoring SUCCESS reply: {}", this, 
ServerProtoUtils.toString(reply));
-      return;
-    }
-    updateCommitIndex(reply.getFollowerCommit());
-
-    final long replyNextIndex = reply.getNextIndex();
-    final long lastIndex = replyNextIndex - 1;
-    final boolean updateMatchIndex;
-
-    if (request.getEntriesCount() == 0) {
-      Preconditions.assertTrue(!request.hasPreviousLog() ||
-              lastIndex == request.getPreviousLog().getIndex(),
-          "reply's next index is %s, request's previous is %s",
-          replyNextIndex, request.getPreviousLog());
-      updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() 
< lastIndex;
-    } else {
-      // check if the reply and the pending request is consistent
-      final long lastEntryIndex = request
-          .getEntries(request.getEntriesCount() - 1).getIndex();
-      Preconditions.assertTrue(lastIndex == lastEntryIndex,
-          "reply's next index is %s, request's last entry index is %s",
-          replyNextIndex, lastEntryIndex);
-      updateMatchIndex = true;
-    }
-    if (updateMatchIndex) {
-      follower.updateMatchIndex(lastIndex);
-      submitEventOnSuccessAppend();
-    }
-  }
-
-  private void onNotLeader(AppendEntriesReplyProto reply) {
-    checkResponseTerm(reply.getTerm());
-    // the running loop will end and the connection will onComplete
-  }
-
-  private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
-    AppendEntriesRequestProto request = 
pendingRequests.remove(reply.getServerReply().getCallId());
-    if (request == null) {
-      // If reply comes after timeout, the reply is ignored.
-      LOG.warn("{}: Request not found, ignoring INCONSISTENCY reply: {}", 
this, ServerProtoUtils.toString(reply));
-      return;
-    }
+  private void checkAndUpdateNextIndex(AppendEntriesRequestProto request, long 
replyNextIndex) {
     Preconditions.assertTrue(request.hasPreviousLog());
-    if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
-      pendingRequests.clear();
-      follower.updateNextIndex(reply.getNextIndex());
+    if (request.getPreviousLog().getIndex() >= replyNextIndex) {
+      synchronized (this) {
+        pendingRequests.clear();
+        follower.updateNextIndex(replyNextIndex);
+      }
     }
   }
 
@@ -514,7 +487,7 @@ public class GrpcLogAppender extends LogAppender {
    * its own State Machine.
    * @return the first available log's start term index
    */
-  protected TermIndex shouldNotifyToInstallSnapshot() {
+  private TermIndex shouldNotifyToInstallSnapshot() {
     if (follower.getNextIndex() < raftLog.getStartIndex()) {
       // The Leader does not have the logs from the Follower's last log
       // index onwards. And install snapshot is disabled. So the Follower
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 6d52b7b..ec02553 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -62,8 +62,8 @@ public class FollowerInfo {
     return matchIndex.get();
   }
 
-  public void updateMatchIndex(long newMatchIndex) {
-    matchIndex.updateIncreasingly(newMatchIndex, debugIndexChange);
+  public boolean updateMatchIndex(long newMatchIndex) {
+    return matchIndex.updateToMax(newMatchIndex, debugIndexChange);
   }
 
   /** @return the commit index acked by the follower. */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index fed52bc..0052a2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -170,17 +170,16 @@ public class LogAppender {
   }
 
   private TermIndex getPrevious() {
-    TermIndex previous = raftLog.getTermIndex(follower.getNextIndex() - 1);
-    if (previous == null) {
-      // if previous is null, nextIndex must be equal to the log start
-      // index (otherwise we will install snapshot).
-      Preconditions.assertTrue(follower.getNextIndex() == 
raftLog.getStartIndex(),
-          "%s: follower's next index %s, local log start index %s",
-          this, follower.getNextIndex(), raftLog.getStartIndex());
-      SnapshotInfo snapshot = server.getState().getLatestSnapshot();
-      previous = snapshot == null ? null : snapshot.getTermIndex();
+    final long nextIndex = follower.getNextIndex();
+    final TermIndex previous = raftLog.getTermIndex(nextIndex - 1);
+    if (previous != null) {
+      return previous;
     }
-    return previous;
+    final long logStartIndex = raftLog.getStartIndex();
+    Preconditions.assertTrue(nextIndex == logStartIndex,
+        "%s: follower's nextIndex = %s != logStartIndex = %s", this, 
nextIndex, logStartIndex);
+    final SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+    return snapshot == null ? null : snapshot.getTermIndex();
   }
 
   protected AppendEntriesRequestProto createRequest(long callId) throws 
RaftLogIOException {
@@ -525,12 +524,14 @@ public class LogAppender {
     return halfMinTimeoutMs - follower.getLastRpcTime().elapsedTimeMs();
   }
 
-  protected void checkResponseTerm(long responseTerm) {
+  protected boolean checkResponseTerm(long responseTerm) {
     synchronized (server) {
       if (isAppenderRunning() && follower.isAttendingVote()
           && responseTerm > leaderState.getCurrentTerm()) {
         leaderState.submitStepDownEvent(responseTerm);
+        return true;
       }
     }
+    return false;
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f242aac..584332d 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -888,14 +888,13 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
     final long currentTerm;
     final long followerCommit = state.getLog().getLastCommittedIndex();
-    final long nextIndex = state.getNextIndex();
     final Optional<FollowerState> followerState;
     synchronized (this) {
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
         final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-            leaderId, getId(), groupId, currentTerm, followerCommit, 
nextIndex, NOT_LEADER, callId);
+            leaderId, getId(), groupId, currentTerm, followerCommit, 
state.getNextIndex(), NOT_LEADER, callId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} 
reply: {}",
               getId(), leaderId, leaderTerm, state, 
ServerProtoUtils.toString(reply));
@@ -919,12 +918,11 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       //      1. There is a snapshot installation in progress
       //      2. There is an overlap between the snapshot index and the entries
       //      3. There is a gap between the local log and the entries
-      // In any of these scenarios, we should retrun an INCONSISTENCY reply
-      // back to leader so that the leader can update this follower's next
-      // index.
+      // In any of these scenarios, we should return an INCONSISTENCY reply
+      // back to leader so that the leader can update this follower's next 
index.
 
       AppendEntriesReplyProto inconsistencyReply = 
checkInconsistentAppendEntries(
-          leaderId, currentTerm, followerCommit, previous, nextIndex, callId, 
entries);
+          leaderId, currentTerm, followerCommit, previous, callId, entries);
       if (inconsistencyReply != null) {
         followerState.ifPresent(fs -> 
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
         return CompletableFuture.completedFuture(inconsistencyReply);
@@ -956,73 +954,46 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
   }
 
   private AppendEntriesReplyProto checkInconsistentAppendEntries(RaftPeerId 
leaderId, long currentTerm,
-      long followerCommit, TermIndex previous, long nextIndex, long callId, 
LogEntryProto... entries) {
-    long replyNextIndex = -1;
-
-    // Check if a snapshot installation through state machine is in progress.
-    if (inProgressInstallSnapshotRequest.get() != null) {
-      replyNextIndex = Math.min(nextIndex, previous.getIndex());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: Cannot append entries as snapshot installation is in " +
-            "progress. Follower next index: {}", getId(), replyNextIndex);
-      }
+      long followerCommit, TermIndex previous, long callId, LogEntryProto... 
entries) {
+    final long replyNextIndex = checkInconsistentAppendEntries(previous, 
entries);
+    if (replyNextIndex == -1) {
+      return null;
     }
 
-    // If a snapshot installation has happened, the new snapshot might
-    // include the log entry indices sent as part of the
-    // AppendEntriesRequestProto. Check that the first log entry proto is
-    // greater than the last index included in the latest snapshot. If not,
-    // the leader should be informed about the new snapshot index so that
-    // it can send log entries only from the next log index
-    long snapshotIndex = state.getSnapshotIndex();
-    if (snapshotIndex > 0 && entries != null && entries.length > 0
-        && entries[0].getIndex() <= snapshotIndex) {
-      replyNextIndex = snapshotIndex + 1;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: Cannot append entries as latest snapshot already has " +
-            "the append entries. Snapshot index: {}, first append entry " +
-            "index: {}.", getId(), snapshotIndex, entries[0].getIndex());
-      }
-    }
+    final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+        leaderId, getId(), groupId, currentTerm, followerCommit, 
replyNextIndex, INCONSISTENCY, callId);
+    LOG.info("{}: inconsistency entries. Reply:{}", getId(), 
ServerProtoUtils.toString(reply));
+    return reply;
+  }
 
-    // We need to check if "previous" is in the local peer. Note that it is
-    // possible that "previous" is covered by the latest snapshot: e.g.,
-    // it's possible there's no log entries outside of the latest snapshot.
-    // However, it is not possible that "previous" index is smaller than the
-    // last index included in snapshot. This is because indices <= snapshot's
-    // last index should have been committed.
-    if (previous != null && !containPrevious(previous)) {
-      replyNextIndex = Math.min(nextIndex, previous.getIndex());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: Cannot append entries as there is a gap between " +
-            "local log and append entries. Previous is not present. " +
-            "Previous: {}, follower next index: {}", getId(), previous, 
replyNextIndex);
-      }
+  private long checkInconsistentAppendEntries(TermIndex previous, 
LogEntryProto... entries) {
+    // Check if a snapshot installation through state machine is in progress.
+    final TermIndex installSnapshot = inProgressInstallSnapshotRequest.get();
+    if (installSnapshot != null) {
+      LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in 
progress", getId(), installSnapshot);
+      return installSnapshot.getIndex();
     }
 
-    if (replyNextIndex != -1) {
-      final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
-          leaderId, getId(), groupId, currentTerm, followerCommit, 
replyNextIndex,
-          INCONSISTENCY, callId);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: inconsistency entries. Reply:{}", getId(), 
ServerProtoUtils.toString(reply));
+    // Check that the first log entry is greater than the snapshot index in 
the latest snapshot.
+    // If not, reply to the leader the new next index.
+    if (entries != null && entries.length > 0) {
+      final long firstEntryIndex = entries[0].getIndex();
+      final long snapshotIndex = state.getSnapshotIndex();
+      if (snapshotIndex > 0 && snapshotIndex >= firstEntryIndex) {
+        LOG.info("{}: Failed appendEntries as latest snapshot ({}) already has 
the append entries (first index: {})",
+            getId(), snapshotIndex, firstEntryIndex);
+        return snapshotIndex + 1;
       }
-      return reply;
     }
 
-    return null;
-  }
-
-  private boolean containPrevious(TermIndex previous) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}",
-          getId(), previous, state.getLatestSnapshot(), 
state.getLatestInstalledSnapshot());
+    // Check if "previous" is contained in current state.
+    if (previous != null && !state.containsTermIndex(previous)) {
+      final long replyNextIndex = Math.min(state.getNextIndex(), 
previous.getIndex());
+      LOG.info("{}: Failed appendEntries as previous log entry ({}) is not 
found", getId(), previous);
+      return replyNextIndex;
     }
-    return state.getLog().contains(previous)
-        ||  (state.getLatestSnapshot() != null
-             && state.getLatestSnapshot().getTermIndex().equals(previous))
-        || (state.getLatestInstalledSnapshot() != null)
-             && state.getLatestInstalledSnapshot().equals(previous);
+
+    return -1;
   }
 
   @Override
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index ed5c728..ff1e368 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -94,6 +94,9 @@ public interface ServerProtoUtils {
         : "size=" + entries.size() + ", first=" + 
toLogEntryString(entries.get(0));
   }
   static String toString(AppendEntriesRequestProto proto) {
+    if (proto == null) {
+      return null;
+    }
     return ProtoUtils.toString(proto.getServerRequest()) + "-t" + 
proto.getLeaderTerm()
         + ", previous=" + toTermIndexString(proto.getPreviousLog())
         + ", leaderCommit=" + proto.getLeaderCommit()
@@ -101,18 +104,19 @@ public interface ServerProtoUtils {
         + ", entries: " + toShortString(proto.getEntriesList());
   }
   static String toString(AppendEntriesReplyProto reply) {
-    return toString(reply.getServerReply()) + "," + reply.getResult()
+    if (reply == null) {
+      return null;
+    }
+    return ProtoUtils.toString(reply.getServerReply()) + "," + 
reply.getResult()
         + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm()
         + ",followerCommit:" + reply.getFollowerCommit();
   }
 
   static String toString(RequestVoteReplyProto proto) {
-    return toString(proto.getServerReply()) + "-t" + proto.getTerm();
-  }
-
-  static String toString(RaftRpcReplyProto reply) {
-    return reply.getRequestorId().toStringUtf8() + "->"
-        + reply.getReplyId().toStringUtf8() + "," + reply.getSuccess();
+    if (proto == null) {
+      return null;
+    }
+    return ProtoUtils.toString(proto.getServerReply()) + "-t" + 
proto.getTerm();
   }
 
   static RaftConfigurationProto.Builder 
toRaftConfigurationProto(RaftConfiguration conf) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 8f2cbed..aa21956 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -42,6 +42,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
@@ -87,7 +88,7 @@ public class ServerState implements Closeable {
    * snapshot. Once we successfully install a snapshot, the SM may not pick it 
up immediately.
    * Further, this will not get updated when SM does snapshots itself.
    */
-  private volatile TermIndex latestInstalledSnapshot;
+  private final AtomicReference<TermIndex> latestInstalledSnapshot = new 
AtomicReference<>();
 
   ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
               RaftServerImpl server, StateMachine stateMachine)
@@ -407,35 +408,31 @@ public class ServerState implements Closeable {
     StateMachine sm = server.getStateMachine();
     sm.pause(); // pause the SM to prepare for install snapshot
     snapshotManager.installSnapshot(sm, request);
-    log.syncWithSnapshot(request.getSnapshotChunk().getTermIndex().getIndex());
-    this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
-        request.getSnapshotChunk().getTermIndex());
+    
updateInstalledSnapshotIndex(ServerProtoUtils.toTermIndex(request.getSnapshotChunk().getTermIndex()));
   }
 
   void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
     log.syncWithSnapshot(lastTermIndexInSnapshot.getIndex());
-    this.latestInstalledSnapshot = lastTermIndexInSnapshot;
+    latestInstalledSnapshot.set(lastTermIndexInSnapshot);
   }
 
   SnapshotInfo getLatestSnapshot() {
-    return 
server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
+    return server.getStateMachine().getLatestSnapshot();
   }
 
-  public TermIndex getLatestInstalledSnapshot() {
-    return latestInstalledSnapshot;
+  public long getLatestInstalledSnapshotIndex() {
+    final TermIndex ti = latestInstalledSnapshot.get();
+    return ti != null? ti.getIndex(): 0L;
   }
 
   /**
-   * The last index included in either the latestSnapshot or the
-   * latestInsalledSnapshot
+   * The last index included in either the latestSnapshot or the 
latestInstalledSnapshot
    * @return the last snapshot index
    */
-  public long getSnapshotIndex() {
-    final long latestSnapshotIndex = getLatestSnapshot() != null ?
-        getLatestSnapshot().getIndex() : 0;
-    final long latestInstalledSnapshotIndex = latestInstalledSnapshot != null ?
-        latestInstalledSnapshot.getIndex() : 0;
-    return Math.max(latestSnapshotIndex, latestInstalledSnapshotIndex);
+  long getSnapshotIndex() {
+    final SnapshotInfo s = getLatestSnapshot();
+    final long latestSnapshotIndex = s != null ? s.getIndex() : 0;
+    return Math.max(latestSnapshotIndex, getLatestInstalledSnapshotIndex());
   }
 
   public long getNextIndex() {
@@ -447,4 +444,16 @@ public class ServerState implements Closeable {
   public long getLastAppliedIndex() {
     return stateMachineUpdater.getLastAppliedIndex();
   }
+
+  boolean containsTermIndex(TermIndex ti) {
+    Objects.requireNonNull(ti, "ti == null");
+
+    if 
(Optional.ofNullable(latestInstalledSnapshot.get()).filter(ti::equals).isPresent())
 {
+      return true;
+    }
+    if 
(Optional.ofNullable(getLatestSnapshot()).map(SnapshotInfo::getTermIndex).filter(ti::equals).isPresent())
 {
+      return true;
+    }
+    return log.contains(ti);
+  }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index f1379e4..84e20ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
@@ -123,11 +124,8 @@ public abstract class RaftLog implements 
RaftLogSequentialOps, Closeable {
    * by the leader.
    */
   public boolean contains(TermIndex ti) {
-    if (ti == null) {
-      return false;
-    }
-    TermIndex local = getTermIndex(ti.getIndex());
-    return ti.equals(local);
+    Objects.requireNonNull(ti, "ti == null");
+    return ti.equals(getTermIndex(ti.getIndex()));
   }
 
   /**
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index ac3813f..5e5c9c6 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -292,8 +292,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
           snapshot == null ? null : snapshot.getFile());
       return RaftServerConstants.INVALID_LOG_INDEX;
     } else {
-      LOG.info("Loading snapshot with t:{}, i:{}, file:{}", snapshot.getTerm(),
-          snapshot.getIndex(), snapshot.getFile().getPath());
+      LOG.info("Loading snapshot {}", snapshot);
       final long endIndex = snapshot.getIndex();
       try (LogInputStream in = new LogInputStream(
           snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
index e512262..7f75fff 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
@@ -175,7 +175,7 @@ public class TestInstallSnapshotWithGrpc {
       for (RaftServerImpl follower : cluster.getFollowers()) {
         follower.getState().getStorage().getStorageDir().getStateMachineDir();
         Assert.assertEquals(leaderSnapshotInfo.getIndex(),
-            follower.getState().getLatestInstalledSnapshot().getIndex());
+            follower.getState().getLatestInstalledSnapshotIndex());
       }
 
       // restart the peer and check if it can correctly handle conf change

Reply via email to