Repository: incubator-ratis
Updated Branches:
  refs/heads/master 60d0bc163 -> ce783995f


RATIS-363. StateMachineUpdater should wait for committed transactions to be 
applied before shutdown.  Contributed by Jitendra Nath Pandey


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ce783995
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ce783995
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ce783995

Branch: refs/heads/master
Commit: ce783995f6612fa76f24a59c23e5964a4bf434a1
Parents: 60d0bc1
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Mon Oct 22 18:00:25 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Mon Oct 22 18:00:25 2018 +0800

----------------------------------------------------------------------
 .../grpc/TestStateMachineShutdownWithGrpc.java  |  26 ++++
 .../apache/ratis/server/impl/ServerState.java   |   7 +-
 .../ratis/server/impl/StateMachineUpdater.java  |  49 +++++--
 .../server/impl/StateMachineShutdownTests.java  | 130 +++++++++++++++++++
 4 files changed, 203 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java
 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java
new file mode 100644
index 0000000..d8d1d0d
--- /dev/null
+++ 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.server.impl.StateMachineShutdownTests;
+
+public class TestStateMachineShutdownWithGrpc
+    extends StateMachineShutdownTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet{
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index f371c5f..49169f5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -372,7 +372,12 @@ public class ServerState implements Closeable {
 
   @Override
   public void close() throws IOException {
-    stateMachineUpdater.stop();
+    try {
+      stateMachineUpdater.stopAndJoin();
+    } catch (InterruptedException e) {
+      LOG.warn(getSelfId() +
+          ": Interrupted when joining stateMachineUpdater", e);
+    }
     LOG.info("{} closes. The last applied log index is {}",
         getSelfId(), getLastAppliedIndex());
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 02dcb31..d0b7947 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -44,6 +44,7 @@ import java.util.concurrent.CompletableFuture;
  */
 class StateMachineUpdater implements Runnable {
   static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
+  private volatile long stopIndex = -1;
 
   enum State {
     RUNNING, STOP, RELOAD
@@ -82,15 +83,34 @@ class StateMachineUpdater implements Runnable {
     updater.start();
   }
 
-  void stop() {
+  private void stop() {
     state = State.STOP;
-    updater.interrupt();
     try {
       stateMachine.close();
     } catch (IOException ignored) {
+      LOG.warn(server.getId() + ": Failed to close "
+          + stateMachine.getClass().getSimpleName()
+          + " " + stateMachine, ignored);
     }
   }
 
+  /**
+   * Stop the updater thread after all the committed transactions
+   * have been applied to the state machine.
+   *
+   * @throws InterruptedException
+   */
+  public void stopAndJoin()
+      throws InterruptedException {
+    if (stopIndex == -1) {
+      synchronized (this) {
+        this.stopIndex = raftLog.getLastCommittedIndex();
+        notifyUpdater();
+      }
+    }
+    updater.join();
+  }
+
   void reloadStateMachine() {
     state = State.RELOAD;
     notifyUpdater();
@@ -113,13 +133,14 @@ class StateMachineUpdater implements Runnable {
           // when the peers just start, the committedIndex is initialized as 0
           // and will be updated only after the leader contacts other peers.
           // Thus initially lastAppliedIndex can be greater than lastCommitted.
-          while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) {
+          while (lastAppliedIndex >= raftLog.getLastCommittedIndex()
+              && !shouldStop()) {
             wait();
           }
         }
 
         final long committedIndex = raftLog.getLastCommittedIndex();
-        Preconditions.assertTrue(lastAppliedIndex < committedIndex);
+        Preconditions.assertTrue(lastAppliedIndex <= committedIndex);
 
         if (state == State.RELOAD) {
           Preconditions.assertTrue(stateMachine.getLifeCycleState() == 
LifeCycle.State.PAUSED);
@@ -158,7 +179,7 @@ class StateMachineUpdater implements Runnable {
         }
 
         // check if need to trigger a snapshot
-        if (shouldTakeSnapshot(lastAppliedIndex)) {
+        if (shouldTakeSnapshot()) {
           if (futures.isInitialized()) {
             JavaUtils.allOf(futures.get()).get();
           }
@@ -166,6 +187,10 @@ class StateMachineUpdater implements Runnable {
           // TODO purge logs, including log cache. but should keep log for 
leader's RPCSenders
           lastSnapshotIndex = lastAppliedIndex;
         }
+
+        if (shouldStop()) {
+          stop();
+        }
       } catch (InterruptedException e) {
         if (!isRunning()) {
           LOG.info("{}: the StateMachineUpdater is interrupted and will 
exit.", this);
@@ -184,9 +209,17 @@ class StateMachineUpdater implements Runnable {
     return state != State.STOP;
   }
 
-  private boolean shouldTakeSnapshot(long currentAppliedIndex) {
-    return autoSnapshotEnabled && (state != State.RELOAD) &&
-        (currentAppliedIndex - lastSnapshotIndex >= autoSnapshotThreshold);
+  private boolean shouldStop() {
+    return stopIndex > -1 && getLastAppliedIndex() >= stopIndex;
+  }
+
+  private boolean shouldTakeSnapshot() {
+    return autoSnapshotEnabled &&
+        ( ((state != State.RELOAD)
+            && (getLastAppliedIndex() - lastSnapshotIndex
+              >= autoSnapshotThreshold))
+          || shouldStop()
+        );
   }
 
   long getLastAppliedIndex() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
new file mode 100644
index 0000000..a66cf70
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+
+public abstract class StateMachineShutdownTests<CLUSTER extends 
MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  protected static class StateMachineWithConditionalWait extends
+      SimpleStateMachine4Testing {
+
+    Long objectToWait = new Long(0);
+    volatile boolean blockOnApply = true;
+
+    @Override
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
+      CompletableFuture<Message> future = new CompletableFuture<Message>();
+      if (blockOnApply) {
+        synchronized (objectToWait) {
+          try {
+            objectToWait.wait();
+          } catch (InterruptedException e) {
+            throw new RuntimeException();
+          }
+        }
+      }
+      future.complete(new RaftTestUtil.SimpleMessage("done"));
+      return future;
+    }
+
+    public void unBlockApplyTxn() {
+      blockOnApply = false;
+      synchronized (objectToWait) {
+        objectToWait.notifyAll();
+      }
+    }
+  }
+
+  @Test
+  public void testStateMachineShutdownWaitsForApplyTxn() throws Exception {
+    final RaftProperties prop = getProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        StateMachineWithConditionalWait.class, StateMachine.class);
+    final MiniRaftCluster cluster = newCluster(3);
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+    RaftServerImpl leader = cluster.getLeader();
+    RaftPeerId leaderId = leader.getId();
+
+    //Unblock leader and one follower
+    ((StateMachineWithConditionalWait)leader.getStateMachine())
+        .unBlockApplyTxn();
+    ((StateMachineWithConditionalWait)cluster.
+        getFollowers().get(0).getStateMachine()).unBlockApplyTxn();
+
+    cluster.getLeaderAndSendFirstMessage(true);
+
+    final RaftClient client = cluster.createClient(leaderId);
+    client.send(new RaftTestUtil.SimpleMessage("message"));
+    RaftClientReply reply = client.send(
+        new RaftTestUtil.SimpleMessage("message2"));
+
+    long logIndex = reply.getLogIndex();
+    //Confirm that followers have committed
+    RaftClientReply watchReply = client.sendWatch(
+        logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+    watchReply.getCommitInfos().forEach(
+        val -> Assert.assertEquals(val.getCommitIndex(), logIndex));
+
+    RaftServerImpl secondFollower = cluster.getFollowers().get(1);
+    // Second follower is blocked in apply transaction
+    Assert.assertTrue(
+        secondFollower.getState().getLastAppliedIndex()
+            < logIndex);
+
+    // Now shutdown the follower in a separate thread
+    Thread t = new Thread(() -> secondFollower.shutdown(true));
+    t.start();
+
+    // The second follower should still be blocked in apply transaction
+    Assert.assertTrue(
+        secondFollower.getState().getLastAppliedIndex()
+            < logIndex);
+
+    // Now unblock the second follower
+    ((StateMachineWithConditionalWait)secondFollower.getStateMachine())
+        .unBlockApplyTxn();
+
+    // Now wait for the thread
+    t.join(5000);
+    Assert.assertEquals(
+        secondFollower.getState().getLastAppliedIndex(),
+        logIndex);
+
+    client.close();
+    cluster.shutdown();
+  }
+}

Reply via email to