This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b735bb5  RATIS-1402. do not send extra rpc calls to follower when the 
follower is still installing a snapshot (#504)
b735bb5 is described below

commit b735bb520c1e11cf3d19328f3fc31765c4aea5e3
Author: Jackson Yao <[email protected]>
AuthorDate: Wed Oct 6 00:47:45 2021 +0800

    RATIS-1402. do not send extra rpc calls to follower when the follower is 
still installing a snapshot (#504)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 33 ++++++++++++----------
 .../ratis/InstallSnapshotNotificationTests.java    |  8 +-----
 2 files changed, 19 insertions(+), 22 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5db2690..62a0676 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.ClientProtoUtils;
@@ -74,7 +75,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -166,7 +166,7 @@ class RaftServerImpl implements RaftServer.Division,
   private final LeaderElectionMetrics leaderElectionMetrics;
   private final RaftServerMetricsImpl raftServerMetrics;
 
-  private final AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
+  private final AtomicLong inProgressInstallSnapshotRequest;
   private final AtomicLong installedSnapshotIndex;
   private final AtomicBoolean isSnapshotNull;
 
@@ -196,7 +196,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     this.state = new ServerState(id, group, properties, this, stateMachine);
     this.retryCache = new RetryCacheImpl(properties);
-    this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
+    this.inProgressInstallSnapshotRequest = new AtomicLong();
     this.installedSnapshotIndex = new AtomicLong();
     this.isSnapshotNull = new AtomicBoolean(false);
     this.dataStreamMap = new DataStreamMapImpl(id);
@@ -1291,8 +1291,8 @@ class RaftServerImpl implements RaftServer.Division,
 
   private long checkInconsistentAppendEntries(TermIndex previous, 
LogEntryProto... entries) {
     // Check if a snapshot installation through state machine is in progress.
-    final TermIndex installSnapshot = inProgressInstallSnapshotRequest.get();
-    if (installSnapshot != null) {
+    final long installSnapshot = inProgressInstallSnapshotRequest.get();
+    if (installSnapshot != 0) {
       LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in 
progress", getMemberId(), installSnapshot);
       return state.getNextIndex();
     }
@@ -1547,7 +1547,7 @@ class RaftServerImpl implements RaftServer.Division,
       long snapshotIndex = state.getSnapshotIndex();
 
       
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
-      if (inProgressInstallSnapshotRequest.compareAndSet(null, 
firstAvailableLogTermIndex)) {
+      if (inProgressInstallSnapshotRequest.compareAndSet(0, 
firstAvailableLogIndex)) {
         LOG.info("{}: Received notification to install snapshot at index {}", 
getMemberId(), firstAvailableLogIndex);
         // Check if snapshot index is already at par or ahead of the first
         // available log index of the Leader.
@@ -1555,7 +1555,7 @@ class RaftServerImpl implements RaftServer.Division,
           // State Machine has already installed the snapshot. Return the
           // latest snapshot index to the Leader.
 
-          
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+          
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
           LOG.info("{}: InstallSnapshot notification result: {}, current 
snapshot index: {}", getMemberId(),
               InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
           return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(), currentTerm,
@@ -1587,7 +1587,7 @@ class RaftServerImpl implements RaftServer.Division,
                 if (exception != null) {
                   LOG.warn("{}: Failed to notify StateMachine to 
InstallSnapshot. Exception: {}",
                       getMemberId(), exception.getMessage());
-                  
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+                  
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
                   return;
                 }
 
@@ -1604,11 +1604,14 @@ class RaftServerImpl implements RaftServer.Division,
                     LOG.debug("{}: StateMachine could not install snapshot as 
it is not available", this);
                   }
                 }
-              });
-        } catch (Throwable t) {
-          
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
-          throw t;
-        }
+              // wait for 10 seconds for statemachine to install snapshot
+              }).get(1, TimeUnit.SECONDS);
+        } catch (ExecutionException t) {
+          
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
+          LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. 
ExecutionException: {}",
+              getMemberId(), t.getMessage());
+          throw new IOException("Failed to install snapshot");
+        } catch (InterruptedException | TimeoutException t) {}
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: StateMachine is processing Snapshot Installation 
Request.", getMemberId());
@@ -1623,7 +1626,7 @@ class RaftServerImpl implements RaftServer.Division,
       if (isSnapshotNull.compareAndSet(true, false)) {
         LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
-        
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+        inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 
0);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
       }
@@ -1634,7 +1637,7 @@ class RaftServerImpl implements RaftServer.Division,
       if (latestInstalledSnapshotIndex > 0) {
         LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", 
getMemberId(),
             InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
-        
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, 
null);
+        inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 
0);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotIndex);
       }
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 5f7f4a7..b307e81 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -350,13 +350,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       cluster.shutdown();
     }
   }
-
-  /**
-   * Basic test for install snapshot notification: start a one node cluster
-   * (disable install snapshot option) and let it generate a snapshot. Then
-   * delete the log and restart the node, and add more nodes as followers.
-   * The new follower nodes should get a install snapshot notification.
-   */
+  
   /**
    * Test for install snapshot during a peer bootstrap: start a one node 
cluster
    * (disable install snapshot option) and let it generate a snapshot. Add

Reply via email to