This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git

commit 48239a7a2df06af3c3b4fb61932b83a8c205d7fd
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Aug 30 14:04:29 2019 -0700

    RATIS-659. StateMachineUpdater#stopAndJoin might not take snapshot due to 
race condition.  Contributed by Lokesh Jain
---
 .../ratis/server/impl/StateMachineUpdater.java     | 26 +++++++++++++---------
 1 file changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index d17f10b..8553a01 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.LongStream;
@@ -151,17 +152,10 @@ class StateMachineUpdater implements Runnable {
         }
 
         final MemoizedSupplier<List<CompletableFuture<Message>>> futures = 
applyLog();
-
-        // check if need to trigger a snapshot
-        if (shouldTakeSnapshot()) {
-          if (futures.isInitialized()) {
-            JavaUtils.allOf(futures.get()).get();
-          }
-
-          takeSnapshot();
-        }
+        checkAndTakeSnapshot(futures);
 
         if (shouldStop()) {
+          checkAndTakeSnapshot(futures);
           stop();
         }
       } catch (InterruptedException e) {
@@ -231,6 +225,18 @@ class StateMachineUpdater implements Runnable {
     return futures;
   }
 
+  private void 
checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> futures)
+      throws ExecutionException, InterruptedException {
+    // check if need to trigger a snapshot
+    if (shouldTakeSnapshot()) {
+      if (futures.isInitialized()) {
+        JavaUtils.allOf(futures.get()).get();
+      }
+
+      takeSnapshot();
+    }
+  }
+
   private void takeSnapshot() {
     final long i;
     try {
@@ -266,7 +272,7 @@ class StateMachineUpdater implements Runnable {
     if (autoSnapshotThreshold == null) {
       return false;
     } else if (shouldStop()) {
-      return true;
+      return getLastAppliedIndex() - snapshotIndex.get() > 0;
     }
     return state == State.RUNNING && getLastAppliedIndex() - 
snapshotIndex.get() >= autoSnapshotThreshold;
   }

Reply via email to