This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dcd2c1  RATIS-1476. Refactor the snapshot installation code to a new 
class. (#569)
0dcd2c1 is described below

commit 0dcd2c1822e42cc20832730575ca0916ac1dea16
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Mar 25 15:47:58 2022 +0800

    RATIS-1476. Refactor the snapshot installation code to a new class. (#569)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 290 +-----------------
 .../server/impl/SnapshotInstallationHandler.java   | 337 +++++++++++++++++++++
 2 files changed, 347 insertions(+), 280 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a62fad0..a7b6ac7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,9 +17,6 @@
  */
 package org.apache.ratis.server.impl;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
@@ -40,13 +37,13 @@ import 
org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.DivisionProperties;
-import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.LeaderElection.Phase;
 import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.metrics.LeaderElectionMetrics;
@@ -71,6 +68,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.NoSuchFileException;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -149,10 +147,8 @@ class RaftServerImpl implements RaftServer.Division,
   private final Info info =  new Info();
 
   private final DivisionProperties divisionProperties;
-  private final int maxTimeoutMs;
   private final TimeDuration leaderStepDownWaitTime;
   private final TimeDuration sleepDeviationThreshold;
-  private final boolean installSnapshotEnabled;
 
   private final LifeCycle lifeCycle;
   private final ServerState state;
@@ -169,10 +165,6 @@ class RaftServerImpl implements RaftServer.Division,
   private final LeaderElectionMetrics leaderElectionMetrics;
   private final RaftServerMetricsImpl raftServerMetrics;
 
-  private final AtomicLong inProgressInstallSnapshotRequest;
-  private final AtomicLong installedSnapshotIndex;
-  private final AtomicBoolean isSnapshotNull;
-
   // To avoid append entry before complete start() method
   // For example, if thread1 start(), but before thread1 startAsFollower(), 
thread2 receive append entry
   // request, and change state to RUNNING by 
lifeCycle.compareAndTransition(STARTING, RUNNING),
@@ -182,6 +174,7 @@ class RaftServerImpl implements RaftServer.Division,
 
   private final TransferLeadership transferLeadership;
   private final SnapshotManagementRequestHandler snapshotRequestHandler;
+  private final SnapshotInstallationHandler snapshotInstallationHandler;
 
   private final ExecutorService serverExecutor;
   private final ExecutorService clientExecutor;
@@ -195,17 +188,12 @@ class RaftServerImpl implements RaftServer.Division,
 
     final RaftProperties properties = proxy.getProperties();
     this.divisionProperties = new DivisionPropertiesImpl(properties);
-    maxTimeoutMs = properties().maxRpcTimeoutMs();
     leaderStepDownWaitTime = 
RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
     this.sleepDeviationThreshold = 
RaftServerConfigKeys.sleepDeviationThreshold(properties);
-    installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
     this.proxy = proxy;
 
     this.state = new ServerState(id, group, properties, this, stateMachine);
     this.retryCache = new RetryCacheImpl(properties);
-    this.inProgressInstallSnapshotRequest = new AtomicLong();
-    this.installedSnapshotIndex = new AtomicLong();
-    this.isSnapshotNull = new AtomicBoolean(false);
     this.dataStreamMap = new DataStreamMapImpl(id);
 
     this.jmxAdapter = new RaftServerJmxAdapter();
@@ -223,6 +211,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     this.transferLeadership = new TransferLeadership(this);
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
+    this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, 
properties);
 
     this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -244,7 +233,7 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   int getMaxTimeoutMs() {
-    return maxTimeoutMs;
+    return properties().maxRpcTimeoutMs();
   }
 
   TimeDuration getRandomElectionTimeout() {
@@ -560,21 +549,6 @@ class RaftServerImpl implements RaftServer.Division,
         getGroup(), getRoleInfoProto(), 
state.getStorage().getStorageDir().isHealthy());
   }
 
-  private RoleInfoProto getRoleInfoProto(RaftPeer leaderPeerInfo) {
-    RaftPeerRole currentRole = role.getCurrentRole();
-    RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
-        .setSelf(getPeer().getRaftPeerProto())
-        .setRole(currentRole)
-        .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
-    final Optional<FollowerState> fs = role.getFollowerState();
-    final ServerRpcProto leaderInfo =
-        ServerProtoUtils.toServerRpcProto(leaderPeerInfo,
-            
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
-    
roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder().setLeaderInfo(leaderInfo)
-        .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
-    return roleInfo.build();
-  }
-
   RoleInfoProto getRoleInfoProto() {
     RaftPeerRole currentRole = role.getCurrentRole();
     RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
@@ -716,7 +690,7 @@ class RaftServerImpl implements RaftServer.Division,
     return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), 
peers);
   }
 
-  private LifeCycle.State assertLifeCycleState(Set<LifeCycle.State> expected) 
throws ServerNotReadyException {
+  LifeCycle.State assertLifeCycleState(Set<LifeCycle.State> expected) throws 
ServerNotReadyException {
     return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
         getMemberId() + " is not in " + expected + ": current state is " + c),
         expected);
@@ -1044,7 +1018,7 @@ class RaftServerImpl implements RaftServer.Division,
     }
 
     synchronized (this) {
-      long installSnapshot = inProgressInstallSnapshotRequest.get();
+      final long installSnapshot = 
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
       // check snapshot install/load
       if (installSnapshot != 0) {
         String msg = String.format("%s: Failed do snapshot as snapshot (%s) 
installation is in progress",
@@ -1280,7 +1254,7 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  private Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType 
updateType) {
+  Optional<FollowerState> updateLastRpcTime(FollowerState.UpdateType 
updateType) {
     final Optional<FollowerState> fs = role.getFollowerState();
     if (fs.isPresent() && lifeCycle.getCurrentState() == RUNNING) {
       fs.get().updateLastRpcTime(updateType);
@@ -1417,7 +1391,7 @@ class RaftServerImpl implements RaftServer.Division,
 
   private long checkInconsistentAppendEntries(TermIndex previous, 
LogEntryProto... entries) {
     // Check if a snapshot installation through state machine is in progress.
-    final long installSnapshot = inProgressInstallSnapshotRequest.get();
+    final long installSnapshot = 
snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
     if (installSnapshot != 0) {
       LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in 
progress", getMemberId(), installSnapshot);
       return state.getNextIndex();
@@ -1450,22 +1424,7 @@ class RaftServerImpl implements RaftServer.Division,
 
   @Override
   public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request) throws IOException {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("{}: receive installSnapshot: {}", getMemberId(),
-          ServerStringUtils.toInstallSnapshotRequestString(request));
-    }
-    final InstallSnapshotReplyProto reply;
-    try {
-      reply = installSnapshotImpl(request);
-    } catch (Exception e) {
-      LOG.error("{}: installSnapshot failed", getMemberId(), e);
-      throw e;
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("{}: reply installSnapshot: {}", getMemberId(),
-          ServerStringUtils.toInstallSnapshotReplyString(reply));
-    }
-    return reply;
+    return snapshotInstallationHandler.installSnapshot(request);
   }
 
   boolean pause() {
@@ -1552,235 +1511,6 @@ class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  private InstallSnapshotReplyProto 
installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
-    final RaftRpcRequestProto r = request.getServerRequest();
-    final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
-    final RaftGroupId leaderGroupId = 
ProtoUtils.toRaftGroupId(r.getRaftGroupId());
-    CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(),
-        leaderId, request);
-
-    assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
-    assertGroup(leaderId, leaderGroupId);
-
-    InstallSnapshotReplyProto reply = null;
-    // Check if install snapshot from Leader is enabled
-    if (installSnapshotEnabled) {
-      // Leader has sent InstallSnapshot request with SnapshotInfo. Install 
the snapshot.
-      if (request.hasSnapshotChunk()) {
-        reply = checkAndInstallSnapshot(request, leaderId);
-      }
-    } else {
-      // Leader has only sent a notification to install snapshot. Inform State 
Machine to install snapshot.
-      if (request.hasNotification()) {
-        reply = notifyStateMachineToInstallSnapshot(request, leaderId);
-      }
-    }
-
-    if (reply != null) {
-      if (request.hasLastRaftConfigurationLogEntryProto()) {
-        // Set the configuration included in the snapshot
-        LogEntryProto newConfLogEntryProto =
-            request.getLastRaftConfigurationLogEntryProto();
-        LOG.info("{}: set new configuration {} from snapshot", getMemberId(),
-            newConfLogEntryProto);
-
-        state.setRaftConf(newConfLogEntryProto);
-        state.writeRaftConfiguration(newConfLogEntryProto);
-        
stateMachine.event().notifyConfigurationChanged(newConfLogEntryProto.getTerm(), 
newConfLogEntryProto.getIndex(),
-            newConfLogEntryProto.getConfigurationEntry());
-      }
-      return reply;
-    }
-
-    // There is a mismatch between configurations on leader and follower.
-    final InstallSnapshotReplyProto failedReply = 
ServerProtoUtils.toInstallSnapshotReplyProto(
-        leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
-    LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but 
follower {} has it set to {}",
-        getMemberId(), 
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
-        leaderId, request.hasSnapshotChunk(), getId(), installSnapshotEnabled);
-    return failedReply;
-  }
-
-  private InstallSnapshotReplyProto checkAndInstallSnapshot(
-      InstallSnapshotRequestProto request, RaftPeerId leaderId) throws 
IOException {
-    final long currentTerm;
-    final long leaderTerm = request.getLeaderTerm();
-    InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = 
request.getSnapshotChunk();
-    final long lastIncludedIndex = 
snapshotChunkRequest.getTermIndex().getIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final InstallSnapshotReplyProto reply = 
ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
-            currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.NOT_LEADER);
-        LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", 
getMemberId());
-        return reply;
-      }
-      changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
-      state.setLeader(leaderId, "installSnapshot");
-
-      updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
-      try {
-        // Check and append the snapshot chunk. We simply put this in lock
-        // considering a follower peer requiring a snapshot installation does 
not
-        // have a lot of requests
-        Preconditions.assertTrue(
-            state.getLog().getNextIndex() <= lastIncludedIndex,
-            "%s log's next id is %s, last included index in snapshot is %s",
-            getMemberId(), state.getLog().getNextIndex(), lastIncludedIndex);
-
-        //TODO: We should only update State with installed snapshot once the 
request is done.
-        state.installSnapshot(request);
-
-        // update the committed index
-        // re-load the state machine if this is the last chunk
-        if (snapshotChunkRequest.getDone()) {
-          state.reloadStateMachine(lastIncludedIndex);
-        }
-      } finally {
-        updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
-      }
-    }
-    if (snapshotChunkRequest.getDone()) {
-      LOG.info("{}: successfully install the entire snapshot-{}", 
getMemberId(), lastIncludedIndex);
-    }
-    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
-        currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.SUCCESS);
-  }
-
-  private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
-      InstallSnapshotRequestProto request, RaftPeerId leaderId) throws 
IOException {
-    final long currentTerm;
-    final long leaderTerm = request.getLeaderTerm();
-    final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
-        request.getNotification().getFirstAvailableTermIndex());
-    final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
-    synchronized (this) {
-      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
-      currentTerm = state.getCurrentTerm();
-      if (!recognized) {
-        final InstallSnapshotReplyProto reply = 
ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
-            currentTerm, InstallSnapshotResult.NOT_LEADER, -1);
-        LOG.warn("{}: Failed to recognize leader for installSnapshot 
notification.", getMemberId());
-        return reply;
-      }
-      changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
-      state.setLeader(leaderId, "installSnapshot");
-      long snapshotIndex = state.getSnapshotIndex();
-
-      
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
-      if (inProgressInstallSnapshotRequest.compareAndSet(0, 
firstAvailableLogIndex)) {
-        LOG.info("{}: Received notification to install snapshot at index {}", 
getMemberId(), firstAvailableLogIndex);
-        // Check if snapshot index is already at par or ahead of the first
-        // available log index of the Leader.
-        if (snapshotIndex + 1 >= firstAvailableLogIndex && 
firstAvailableLogIndex > 0) {
-          // State Machine has already installed the snapshot. Return the
-          // latest snapshot index to the Leader.
-
-          
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
-          LOG.info("{}: InstallSnapshot notification result: {}, current 
snapshot index: {}", getMemberId(),
-              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
-          return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(), currentTerm,
-              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
-        }
-
-        Optional<RaftPeerProto> leaderPeerInfo = null;
-        if (request.hasLastRaftConfigurationLogEntryProto()) {
-          List<RaftPeerProto> peerList = 
request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
-              .getPeersList();
-          leaderPeerInfo = peerList.stream().filter(p -> 
RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
-          Preconditions.assertTrue(leaderPeerInfo.isPresent());
-        }
-
-        // For the cases where RaftConf is empty on newly started peer with
-        // empty peer list, we retrieve leader info from
-        // installSnapShotRequestProto.
-        RoleInfoProto roleInfoProto =
-            getRaftConf().getPeer(state.getLeaderId()) == null ?
-                getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
-                getRoleInfoProto();
-        // This is the first installSnapshot notify request for this term and
-        // index. Notify the state machine to install the snapshot.
-        LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's 
first available index is {}.",
-            getMemberId(), state.getLog().getNextIndex(), 
firstAvailableLogIndex);
-        try {
-          
stateMachine.followerEvent().notifyInstallSnapshotFromLeader(roleInfoProto, 
firstAvailableLogTermIndex)
-              .whenComplete((reply, exception) -> {
-                if (exception != null) {
-                  LOG.warn("{}: Failed to notify StateMachine to 
InstallSnapshot. Exception: {}",
-                      getMemberId(), exception.getMessage());
-                  
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
-                  return;
-                }
-
-                if (reply != null) {
-                  LOG.info("{}: StateMachine successfully installed snapshot 
index {}. Reloading the StateMachine.",
-                      getMemberId(), reply.getIndex());
-                  stateMachine.pause();
-                  state.updateInstalledSnapshotIndex(reply);
-                  state.reloadStateMachine(reply.getIndex());
-                  installedSnapshotIndex.set(reply.getIndex());
-                } else {
-                  isSnapshotNull.set(true);
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug("{}: StateMachine could not install snapshot as 
it is not available", this);
-                  }
-                }
-              // wait for 1 seconds for statemachine to install snapshot
-              }).get(1, TimeUnit.SECONDS);
-        } catch (InterruptedException | TimeoutException t) {
-          //nothing to do
-        } catch (Exception t) {
-          // there are two cases:
-          //1 `get()` may throw ExecutionException if `whenComplete` throw an 
exception
-          //2 when generating completeFuture, 
`statemachine#notifyInstallSnapshotFromLeader`
-          // may throw an uncertain exception, which is determined by the 
implementation of
-          // user statemachine.
-          
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
-          final String err = getMemberId() + ": Failed to notify StateMachine 
to InstallSnapshot.";
-          LOG.warn(err + " " + t);
-          throw new IOException(err, t);
-        }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: StateMachine is processing Snapshot Installation 
Request.", getMemberId());
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{}: StateMachine is already installing a snapshot.", 
getMemberId());
-        }
-      }
-
-      // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
-      if (isSnapshotNull.compareAndSet(true, false)) {
-        LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
-            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
-        inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 
0);
-        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
-            currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
-      }
-
-      // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the 
installed snapshot index and reset
-      // installedSnapshotIndex to 0.
-      long latestInstalledSnapshotIndex = 
this.installedSnapshotIndex.getAndSet(0);
-      if (latestInstalledSnapshotIndex > 0) {
-        LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
-            InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
-        inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 
0);
-        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
-            currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
-      }
-
-      // Otherwise, Snapshot installation is in progress.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
-            InstallSnapshotResult.IN_PROGRESS);
-      }
-      return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
-          currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
-    }
-  }
-
   void submitUpdateCommitEvent() {
     role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent);
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
new file mode 100644
index 0000000..ad6f5a4
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.ServerRpcProto;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+class SnapshotInstallationHandler {
+  static final Logger LOG = 
LoggerFactory.getLogger(SnapshotInstallationHandler.class);
+
+  private final RaftServerImpl server;
+  private final ServerState state;
+
+  private final boolean installSnapshotEnabled;
+  private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong();
+  private final AtomicLong installedSnapshotIndex = new AtomicLong();
+  private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
+
+  SnapshotInstallationHandler(RaftServerImpl server, RaftProperties 
properties) {
+    this.server = server;
+    this.state = server.getState();
+    this.installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
+  }
+
+  RaftGroupMemberId getMemberId() {
+    return state.getMemberId();
+  }
+
+  long getInProgressInstallSnapshotIndex() {
+    return inProgressInstallSnapshotIndex.get();
+  }
+
+  InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request) throws IOException {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("{}: receive installSnapshot: {}", getMemberId(),
+          ServerStringUtils.toInstallSnapshotRequestString(request));
+    }
+    final InstallSnapshotReplyProto reply;
+    try {
+      reply = installSnapshotImpl(request);
+    } catch (Exception e) {
+      LOG.error("{}: installSnapshot failed", getMemberId(), e);
+      throw e;
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("{}: reply installSnapshot: {}", getMemberId(),
+          ServerStringUtils.toInstallSnapshotReplyString(reply));
+    }
+    return reply;
+  }
+
+  private InstallSnapshotReplyProto 
installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
+    final RaftRpcRequestProto r = request.getServerRequest();
+    final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
+    final RaftGroupId leaderGroupId = 
ProtoUtils.toRaftGroupId(r.getRaftGroupId());
+    CodeInjectionForTesting.execute(RaftServerImpl.INSTALL_SNAPSHOT, 
server.getId(), leaderId, request);
+
+    server.assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
+    server.assertGroup(leaderId, leaderGroupId);
+
+    InstallSnapshotReplyProto reply = null;
+    // Check if install snapshot from Leader is enabled
+    if (installSnapshotEnabled) {
+      // Leader has sent InstallSnapshot request with SnapshotInfo. Install 
the snapshot.
+      if (request.hasSnapshotChunk()) {
+        reply = checkAndInstallSnapshot(request, leaderId);
+      }
+    } else {
+      // Leader has only sent a notification to install snapshot. Inform State 
Machine to install snapshot.
+      if (request.hasNotification()) {
+        reply = notifyStateMachineToInstallSnapshot(request, leaderId);
+      }
+    }
+
+    if (reply != null) {
+      if (request.hasLastRaftConfigurationLogEntryProto()) {
+        // Set the configuration included in the snapshot
+        final LogEntryProto proto = 
request.getLastRaftConfigurationLogEntryProto();
+        LOG.info("{}: set new configuration {} from snapshot", getMemberId(), 
proto);
+
+        state.setRaftConf(proto);
+        state.writeRaftConfiguration(proto);
+        server.getStateMachine().event().notifyConfigurationChanged(
+            proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry());
+      }
+      return reply;
+    }
+
+    // There is a mismatch between configurations on leader and follower.
+    final InstallSnapshotReplyProto failedReply = 
ServerProtoUtils.toInstallSnapshotReplyProto(
+        leaderId, getMemberId(), InstallSnapshotResult.CONF_MISMATCH);
+    LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but 
follower {} has it set to {}",
+        getMemberId(), 
RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
+        leaderId, request.hasSnapshotChunk(), server.getId(), 
installSnapshotEnabled);
+    return failedReply;
+  }
+
+  private InstallSnapshotReplyProto 
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
+      RaftPeerId leaderId) throws IOException {
+    final long currentTerm;
+    final long leaderTerm = request.getLeaderTerm();
+    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest 
= request.getSnapshotChunk();
+    final long lastIncludedIndex = 
snapshotChunkRequest.getTermIndex().getIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final InstallSnapshotReplyProto reply = 
ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+            currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.NOT_LEADER);
+        LOG.warn("{}: Failed to recognize leader for installSnapshot chunk.", 
getMemberId());
+        return reply;
+      }
+      server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+      state.setLeader(leaderId, "installSnapshot");
+
+      
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
+      try {
+        // Check and append the snapshot chunk. We simply put this in lock
+        // considering a follower peer requiring a snapshot installation does 
not
+        // have a lot of requests
+        Preconditions.assertTrue(state.getLog().getNextIndex() <= 
lastIncludedIndex,
+            "%s log's next id is %s, last included index in snapshot is %s",
+            getMemberId(), state.getLog().getNextIndex(), lastIncludedIndex);
+
+        //TODO: We should only update State with installed snapshot once the 
request is done.
+        state.installSnapshot(request);
+
+        // update the committed index
+        // re-load the state machine if this is the last chunk
+        if (snapshotChunkRequest.getDone()) {
+          state.reloadStateMachine(lastIncludedIndex);
+        }
+      } finally {
+        
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
+      }
+    }
+    if (snapshotChunkRequest.getDone()) {
+      LOG.info("{}: successfully install the entire snapshot-{}", 
getMemberId(), lastIncludedIndex);
+    }
+    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+        currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.SUCCESS);
+  }
+
+  private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
+      InstallSnapshotRequestProto request, RaftPeerId leaderId) throws 
IOException {
+    final long currentTerm;
+    final long leaderTerm = request.getLeaderTerm();
+    final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
+        request.getNotification().getFirstAvailableTermIndex());
+    final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final InstallSnapshotReplyProto reply = 
ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
+            currentTerm, InstallSnapshotResult.NOT_LEADER, -1);
+        LOG.warn("{}: Failed to recognize leader for installSnapshot 
notification.", getMemberId());
+        return reply;
+      }
+      server.changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot");
+      state.setLeader(leaderId, "installSnapshot");
+      long snapshotIndex = state.getSnapshotIndex();
+
+      
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
+      if (inProgressInstallSnapshotIndex.compareAndSet(0, 
firstAvailableLogIndex)) {
+        LOG.info("{}: Received notification to install snapshot at index {}", 
getMemberId(), firstAvailableLogIndex);
+        // Check if snapshot index is already at par or ahead of the first
+        // available log index of the Leader.
+        if (snapshotIndex + 1 >= firstAvailableLogIndex && 
firstAvailableLogIndex > 0) {
+          // State Machine has already installed the snapshot. Return the
+          // latest snapshot index to the Leader.
+
+          inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
+          LOG.info("{}: InstallSnapshot notification result: {}, current 
snapshot index: {}", getMemberId(),
+              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+          return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(), currentTerm,
+              InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+        }
+
+        final RaftPeerProto leaderProto;
+        if (!request.hasLastRaftConfigurationLogEntryProto()) {
+          leaderProto = null;
+        } else {
+          leaderProto = 
request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList()
+              .stream()
+              .filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId))
+              .findFirst()
+              .orElseThrow(() -> new IllegalArgumentException("Leader " + 
leaderId
+                  + " not found from the last configuration LogEntryProto, 
request = " + request));
+        }
+
+        // For the cases where RaftConf is empty on newly started peer with 
empty peer list,
+        // we retrieve leader info from installSnapShotRequestProto.
+        final RoleInfoProto proto = leaderProto == null || 
server.getRaftConf().getPeer(state.getLeaderId()) != null?
+            server.getRoleInfoProto(): 
getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto));
+        // This is the first installSnapshot notify request for this term and
+        // index. Notify the state machine to install the snapshot.
+        LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's 
first available index is {}.",
+            getMemberId(), state.getLog().getNextIndex(), 
firstAvailableLogIndex);
+        try {
+          
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, 
firstAvailableLogTermIndex)
+              .whenComplete((reply, exception) -> {
+                if (exception != null) {
+                  LOG.warn("{}: Failed to notify StateMachine to 
InstallSnapshot. Exception: {}",
+                      getMemberId(), exception.getMessage());
+                  
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 0);
+                  return;
+                }
+
+                if (reply != null) {
+                  LOG.info("{}: StateMachine successfully installed snapshot 
index {}. Reloading the StateMachine.",
+                      getMemberId(), reply.getIndex());
+                  server.getStateMachine().pause();
+                  state.updateInstalledSnapshotIndex(reply);
+                  state.reloadStateMachine(reply.getIndex());
+                  installedSnapshotIndex.set(reply.getIndex());
+                } else {
+                  isSnapshotNull.set(true);
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("{}: StateMachine could not install snapshot as 
it is not available", this);
+                  }
+                }
+                // wait for 1 seconds for statemachine to install snapshot
+              }).get(1, TimeUnit.SECONDS);
+        } catch (InterruptedException | TimeoutException t) {
+          //nothing to do
+        } catch (Exception t) {
+          // there are two cases:
+          //1 `get()` may throw ExecutionException if `whenComplete` throw an 
exception
+          //2 when generating completeFuture, 
`statemachine#notifyInstallSnapshotFromLeader`
+          // may throw an uncertain exception, which is determined by the 
implementation of
+          // user statemachine.
+          inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
+          final String err = getMemberId() + ": Failed to notify StateMachine 
to InstallSnapshot.";
+          LOG.warn(err + " " + t);
+          throw new IOException(err, t);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{}: StateMachine is processing Snapshot Installation 
Request.", getMemberId());
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{}: StateMachine is already installing a snapshot.", 
getMemberId());
+        }
+      }
+
+      // If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
+      if (isSnapshotNull.compareAndSet(true, false)) {
+        LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
+            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
+        inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
+        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+            currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
+      }
+
+      // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the 
installed snapshot index and reset
+      // installedSnapshotIndex to 0.
+      long latestInstalledSnapshotIndex = 
this.installedSnapshotIndex.getAndSet(0);
+      if (latestInstalledSnapshotIndex > 0) {
+        LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
+            InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
+        inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
0);
+        return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+            currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
+      }
+
+      // Otherwise, Snapshot installation is in progress.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
+            InstallSnapshotResult.IN_PROGRESS);
+      }
+      return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+          currentTerm, InstallSnapshotResult.IN_PROGRESS, -1);
+    }
+  }
+
+  private RoleInfoProto getRoleInfoProto(RaftPeer leader) {
+    final RoleInfo role = server.getRole();
+    final Optional<FollowerState> fs = role.getFollowerState();
+    final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(leader,
+        
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
+    final FollowerInfoProto.Builder followerInfo = 
FollowerInfoProto.newBuilder()
+        .setLeaderInfo(leaderInfo)
+        .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0));
+    return RoleInfoProto.newBuilder()
+        .setSelf(server.getPeer().getRaftPeerProto())
+        .setRole(role.getCurrentRole())
+        .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs())
+        .setFollowerInfo(followerInfo)
+        .build();
+  }
+}
\ No newline at end of file

Reply via email to