This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8620e09  RATIS-514. Check if leader and follower have same 
configuration for installSnapshot.  Contributed by Hanisha Koneru
8620e09 is described below

commit 8620e09b952b88a68a6912d26e53aa308a3c3b8c
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Apr 10 16:51:48 2019 +0800

    RATIS-514. Check if leader and follower have same configuration for 
installSnapshot.  Contributed by Hanisha Koneru
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  6 +++-
 ratis-proto/src/main/proto/Raft.proto              | 41 +++++++++++++++-------
 .../org/apache/ratis/server/impl/LogAppender.java  |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 32 +++++++++++------
 .../apache/ratis/server/impl/ServerProtoUtils.java | 39 ++++++++++++++------
 .../org/apache/ratis/server/impl/ServerState.java  |  4 +--
 .../ratis/server/storage/SnapshotManager.java      |  8 +++--
 7 files changed, 91 insertions(+), 41 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 007284e..7d9e018 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -347,7 +347,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     synchronized void addPending(InstallSnapshotRequestProto request) {
-      pending.offer(request.getRequestIndex());
+      pending.offer(request.getSnapshotChunk().getRequestIndex());
     }
 
     synchronized void removePending(InstallSnapshotReplyProto reply) {
@@ -396,6 +396,10 @@ public class GrpcLogAppender extends LogAppender {
         case NOT_LEADER:
           checkResponseTerm(reply.getTerm());
           break;
+        case CONF_MISMATCH:
+          LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to 
{} but follower {} has it set to {}",
+              server.getId(), 
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
+              server.getId(), installSnapshotEnabled, getFollowerId(), 
!installSnapshotEnabled);
         case UNRECOGNIZED:
           break;
       }
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 96b91ec..2d24a50 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -126,6 +126,7 @@ enum InstallSnapshotResult {
   NOT_LEADER = 1;
   IN_PROGRESS = 2;
   ALREADY_INSTALLED = 3;
+  CONF_MISMATCH = 4;
 }
 
 message RequestVoteRequestProto {
@@ -172,24 +173,38 @@ message AppendEntriesReplyProto {
 }
 
 message InstallSnapshotRequestProto {
+  message SnapshotChunkProto {
+    string requestId = 1; // an identifier for chunked-requests.
+    uint32 requestIndex = 2; // the index for this request chunk. Starts from 
0.
+    RaftConfigurationProto raftConfiguration = 3;
+    TermIndexProto termIndex = 4;
+    repeated FileChunkProto fileChunks = 5;
+    uint64 totalSize = 6;
+    bool done = 7; // whether this is the final chunk for the same req.
+  }
+
+  message NotificationProto {
+    TermIndexProto firstAvailableTermIndex = 1; // first available log index 
to notify Follower to install snapshot.
+  }
+
   RaftRpcRequestProto serverRequest = 1;
-  string requestId = 2; // an identifier for chunked-requests.
-  uint32 requestIndex = 3; // the index for this request chunk. Starts from 0.
-  RaftConfigurationProto raftConfiguration = 4;
-  uint64 leaderTerm = 5;
-  TermIndexProto termIndex = 6;
-  repeated FileChunkProto fileChunks = 7;
-  uint64 totalSize = 8;
-  bool done = 9; // whether this is the final chunk for the same req.
-  TermIndexProto firstAvailableLogIndex = 11; // first available log index to 
notify Follower to install snapshot
+  uint64 leaderTerm = 2;
+
+  oneof InstallSnapshotRequestBody {
+    SnapshotChunkProto snapshotChunk = 3;
+    NotificationProto notification = 4;
+  }
 }
 
 message InstallSnapshotReplyProto {
   RaftRpcReplyProto serverReply = 1;
-  uint32 requestIndex = 2;
-  uint64 term = 3;
-  InstallSnapshotResult result = 4;
-  uint64 snapshotIndex = 5;
+  uint64 term = 2;
+  InstallSnapshotResult result = 3;
+
+  oneof InstallSnapshotReplyBody {
+    uint32 requestIndex = 4;  // index of the snapshot chunk request.
+    uint64 snapshotIndex = 5; // index of snapshot installed after 
notification.
+  }
 }
 
 message ClientMessageEntryProto {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 136b4d9..82c942d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -130,7 +130,7 @@ public class LogAppender {
     return follower;
   }
 
-  RaftPeerId getFollowerId() {
+  protected RaftPeerId getFollowerId() {
     return getFollower().getPeer().getId();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6788e96..b187108 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1032,19 +1032,31 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     // Check if install snapshot from Leader is enabled
     if (installSnapshotEnabled) {
       // Leader has sent InstallSnapshot request with SnapshotInfo. Install 
the snapshot.
-      return checkAndInstallSnapshot(request, leaderId);
+      if (request.hasSnapshotChunk()) {
+        return checkAndInstallSnapshot(request, leaderId);
+      }
     } else {
       // Leader has only sent a notification to install snapshot. Inform State 
Machine to install snapshot.
-      return notifyStateMachineToInstallSnapshot(request, leaderId);
+      if (request.hasNotification()) {
+        return notifyStateMachineToInstallSnapshot(request, leaderId);
+      }
     }
+    // There is a mismatch between configurations on leader and follower.
+    final InstallSnapshotReplyProto reply = ServerProtoUtils
+        .toInstallSnapshotReplyProto(leaderId, getId(), groupId,
+            InstallSnapshotResult.CONF_MISMATCH);
+    LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but 
follower {} has it set to {}",
+        getId(), 
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
+        leaderId, request.hasSnapshotChunk(), getId(), installSnapshotEnabled);
+    return reply;
   }
 
   private InstallSnapshotReplyProto checkAndInstallSnapshot(
       InstallSnapshotRequestProto request, RaftPeerId leaderId) throws 
IOException {
     final long currentTerm;
     final long leaderTerm = request.getLeaderTerm();
-    final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
-        request.getTermIndex());
+    InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = 
request.getSnapshotChunk();
+    final TermIndex lastTermIndex = 
ServerProtoUtils.toTermIndex(snapshotChunkRequest.getTermIndex());
     final long lastIncludedIndex = lastTermIndex.getIndex();
     final Optional<FollowerState> followerState;
     synchronized (this) {
@@ -1053,7 +1065,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       if (!recognized) {
         final InstallSnapshotReplyProto reply = ServerProtoUtils
             .toInstallSnapshotReplyProto(leaderId, getId(), groupId, 
currentTerm,
-                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+                snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.NOT_LEADER);
         LOG.debug("{}: do not recognize leader for installing snapshot." +
             " Reply: {}", getId(), reply);
         return reply;
@@ -1076,19 +1088,19 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
         // update the committed index
         // re-load the state machine if this is the last chunk
-        if (request.getDone()) {
+        if (snapshotChunkRequest.getDone()) {
           state.reloadStateMachine(lastIncludedIndex, leaderTerm);
         }
       } finally {
         updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
       }
     }
-    if (request.getDone()) {
+    if (snapshotChunkRequest.getDone()) {
       LOG.info("{}:{} successfully install the whole snapshot-{}", getId(), 
getGroupId(),
           lastIncludedIndex);
     }
     return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(), 
groupId,
-        currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
+        currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.SUCCESS);
   }
 
   private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
@@ -1096,7 +1108,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     final long currentTerm;
     final long leaderTerm = request.getLeaderTerm();
     final TermIndex firstAvailableLogTermIndex = ServerProtoUtils.toTermIndex(
-        request.getFirstAvailableLogIndex());
+        request.getNotification().getFirstAvailableTermIndex());
     final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
 
     synchronized (this) {
@@ -1105,7 +1117,7 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       if (!recognized) {
         final InstallSnapshotReplyProto reply = ServerProtoUtils
             .toInstallSnapshotReplyProto(leaderId, getId(), groupId, 
currentTerm,
-                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+                InstallSnapshotResult.NOT_LEADER, -1);
         LOG.debug("{}: do not recognize leader for installing snapshot." +
             " Reply: {}", getId(), reply);
         return reply;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index bb5a1fa..adc593e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -276,52 +276,69 @@ public interface ServerProtoUtils {
 
   static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
-      long term, int requestIndex, InstallSnapshotResult result) {
+      long currentTerm, int requestIndex, InstallSnapshotResult result) {
     final RaftRpcReplyProto.Builder rb = 
toRaftRpcReplyProtoBuilder(requestorId,
         replyId, groupId, result == InstallSnapshotResult.SUCCESS);
     final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
-        .newBuilder().setServerReply(rb).setTerm(term).setResult(result)
+        .newBuilder().setServerReply(rb).setTerm(currentTerm).setResult(result)
         .setRequestIndex(requestIndex);
     return builder.build();
   }
 
   static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
-      long term, InstallSnapshotResult result, long installedSnapshotIndex) {
+      long currentTerm, InstallSnapshotResult result, long 
installedSnapshotIndex) {
     final RaftRpcReplyProto.Builder rb = 
toRaftRpcReplyProtoBuilder(requestorId,
         replyId, groupId, result == InstallSnapshotResult.SUCCESS);
     final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
-        .newBuilder().setServerReply(rb).setTerm(term).setResult(result);
+        
.newBuilder().setServerReply(rb).setTerm(currentTerm).setResult(result);
     if (installedSnapshotIndex > 0) {
       builder.setSnapshotIndex(installedSnapshotIndex);
     }
     return builder.build();
   }
 
+  static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
+      RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
+      InstallSnapshotResult result) {
+    final RaftRpcReplyProto.Builder rb = 
toRaftRpcReplyProtoBuilder(requestorId,
+        replyId, groupId, result == InstallSnapshotResult.SUCCESS);
+    final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
+        .newBuilder().setServerReply(rb).setResult(result);
+    return builder.build();
+  }
+
   static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, String 
requestId, int requestIndex,
       long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
       long totalSize, boolean done) {
+    final InstallSnapshotRequestProto.SnapshotChunkProto.Builder 
snapshotChunkProto =
+        InstallSnapshotRequestProto.SnapshotChunkProto.newBuilder()
+            .setRequestId(requestId)
+            .setRequestIndex(requestIndex)
+            .setTermIndex(toTermIndexProto(lastTermIndex))
+            .addAllFileChunks(chunks)
+            .setTotalSize(totalSize)
+            .setDone(done);
     return InstallSnapshotRequestProto.newBuilder()
         .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, 
groupId))
-        .setRequestId(requestId)
-        .setRequestIndex(requestIndex)
         // .setRaftConfiguration()  TODO: save and pass RaftConfiguration
         .setLeaderTerm(term)
-        .setTermIndex(toTermIndexProto(lastTermIndex))
-        .addAllFileChunks(chunks)
-        .setTotalSize(totalSize)
-        .setDone(done).build();
+        .setSnapshotChunk(snapshotChunkProto)
+        .build();
   }
 
   static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
       long leaderTerm, TermIndex firstAvailable) {
+    final InstallSnapshotRequestProto.NotificationProto.Builder 
notificationProto =
+        InstallSnapshotRequestProto.NotificationProto.newBuilder()
+            .setFirstAvailableTermIndex(toTermIndexProto(firstAvailable));
     return InstallSnapshotRequestProto.newBuilder()
         .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, 
groupId))
         // .setRaftConfiguration()  TODO: save and pass RaftConfiguration
         .setLeaderTerm(leaderTerm)
-        .setFirstAvailableLogIndex(toTermIndexProto(firstAvailable))
+        .setNotification(notificationProto)
         .build();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 8fb55f2..8f2cbed 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -407,9 +407,9 @@ public class ServerState implements Closeable {
     StateMachine sm = server.getStateMachine();
     sm.pause(); // pause the SM to prepare for install snapshot
     snapshotManager.installSnapshot(sm, request);
-    log.syncWithSnapshot(request.getTermIndex().getIndex());
+    log.syncWithSnapshot(request.getSnapshotChunk().getTermIndex().getIndex());
     this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
-        request.getTermIndex());
+        request.getSnapshotChunk().getTermIndex());
   }
 
   void updateInstalledSnapshotIndex(TermIndex lastTermIndexInSnapshot) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 77a9963..f0afc2b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -54,7 +54,9 @@ public class SnapshotManager {
 
   public void installSnapshot(StateMachine stateMachine,
       InstallSnapshotRequestProto request) throws IOException {
-    final long lastIncludedIndex = request.getTermIndex().getIndex();
+    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
+        request.getSnapshotChunk();
+    final long lastIncludedIndex = 
snapshotChunkRequest.getTermIndex().getIndex();
     final RaftStorageDirectory dir = storage.getStorageDir();
 
     File tmpDir = dir.getNewTempDir();
@@ -66,7 +68,7 @@ public class SnapshotManager {
     // TODO: Make sure that subsequent requests for the same installSnapshot 
are coming in order,
     // and are not lost when whole request cycle is done. Check requestId and 
requestIndex here
 
-    for (FileChunkProto chunk : request.getFileChunksList()) {
+    for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
       SnapshotInfo pi = stateMachine.getLatestSnapshot();
       if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
         throw new IOException("There exists snapshot file "
@@ -124,7 +126,7 @@ public class SnapshotManager {
       }
     }
 
-    if (request.getDone()) {
+    if (snapshotChunkRequest.getDone()) {
       LOG.info("Install snapshot is done, renaming tnp dir:{} to:{}",
           tmpDir, dir.getStateMachineDir());
       dir.getStateMachineDir().delete();

Reply via email to