Repository: incubator-ratis Updated Branches: refs/heads/master 60d0bc163 -> ce783995f
RATIS-363. StateMachineUpdater should wait for committed transactions to be applied before shutdown. Contributed by Jitendra Nath Pandey Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ce783995 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ce783995 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ce783995 Branch: refs/heads/master Commit: ce783995f6612fa76f24a59c23e5964a4bf434a1 Parents: 60d0bc1 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Oct 22 18:00:25 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Oct 22 18:00:25 2018 +0800 ---------------------------------------------------------------------- .../grpc/TestStateMachineShutdownWithGrpc.java | 26 ++++ .../apache/ratis/server/impl/ServerState.java | 7 +- .../ratis/server/impl/StateMachineUpdater.java | 49 +++++-- .../server/impl/StateMachineShutdownTests.java | 130 +++++++++++++++++++ 4 files changed, 203 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java new file mode 100644 index 0000000..d8d1d0d --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestStateMachineShutdownWithGrpc.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.server.impl.StateMachineShutdownTests; + +public class TestStateMachineShutdownWithGrpc + extends StateMachineShutdownTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet{ + +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index f371c5f..49169f5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -372,7 +372,12 @@ public class ServerState implements Closeable { @Override public void close() throws IOException { - stateMachineUpdater.stop(); + try { + stateMachineUpdater.stopAndJoin(); + } catch (InterruptedException e) { + LOG.warn(getSelfId() + + ": Interrupted when joining stateMachineUpdater", e); + } LOG.info("{} closes. The last applied log index is {}", getSelfId(), getLastAppliedIndex()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 02dcb31..d0b7947 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -44,6 +44,7 @@ import java.util.concurrent.CompletableFuture; */ class StateMachineUpdater implements Runnable { static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class); + private volatile long stopIndex = -1; enum State { RUNNING, STOP, RELOAD @@ -82,15 +83,34 @@ class StateMachineUpdater implements Runnable { updater.start(); } - void stop() { + private void stop() { state = State.STOP; - updater.interrupt(); try { stateMachine.close(); } catch (IOException ignored) { + LOG.warn(server.getId() + ": Failed to close " + + stateMachine.getClass().getSimpleName() + + " " + stateMachine, ignored); } } + /** + * Stop the updater thread after all the committed transactions + * have been applied to the state machine. + * + * @throws InterruptedException + */ + public void stopAndJoin() + throws InterruptedException { + if (stopIndex == -1) { + synchronized (this) { + this.stopIndex = raftLog.getLastCommittedIndex(); + notifyUpdater(); + } + } + updater.join(); + } + void reloadStateMachine() { state = State.RELOAD; notifyUpdater(); @@ -113,13 +133,14 @@ class StateMachineUpdater implements Runnable { // when the peers just start, the committedIndex is initialized as 0 // and will be updated only after the leader contacts other peers. // Thus initially lastAppliedIndex can be greater than lastCommitted. - while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) { + while (lastAppliedIndex >= raftLog.getLastCommittedIndex() + && !shouldStop()) { wait(); } } final long committedIndex = raftLog.getLastCommittedIndex(); - Preconditions.assertTrue(lastAppliedIndex < committedIndex); + Preconditions.assertTrue(lastAppliedIndex <= committedIndex); if (state == State.RELOAD) { Preconditions.assertTrue(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED); @@ -158,7 +179,7 @@ class StateMachineUpdater implements Runnable { } // check if need to trigger a snapshot - if (shouldTakeSnapshot(lastAppliedIndex)) { + if (shouldTakeSnapshot()) { if (futures.isInitialized()) { JavaUtils.allOf(futures.get()).get(); } @@ -166,6 +187,10 @@ class StateMachineUpdater implements Runnable { // TODO purge logs, including log cache. but should keep log for leader's RPCSenders lastSnapshotIndex = lastAppliedIndex; } + + if (shouldStop()) { + stop(); + } } catch (InterruptedException e) { if (!isRunning()) { LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this); @@ -184,9 +209,17 @@ class StateMachineUpdater implements Runnable { return state != State.STOP; } - private boolean shouldTakeSnapshot(long currentAppliedIndex) { - return autoSnapshotEnabled && (state != State.RELOAD) && - (currentAppliedIndex - lastSnapshotIndex >= autoSnapshotThreshold); + private boolean shouldStop() { + return stopIndex > -1 && getLastAppliedIndex() >= stopIndex; + } + + private boolean shouldTakeSnapshot() { + return autoSnapshotEnabled && + ( ((state != State.RELOAD) + && (getLastAppliedIndex() - lastSnapshotIndex + >= autoSnapshotThreshold)) + || shouldStop() + ); } long getLastAppliedIndex() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce783995/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java new file mode 100644 index 0000000..a66cf70 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.server.impl; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.*; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.TransactionContext; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + + +public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster> + extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + + protected static class StateMachineWithConditionalWait extends + SimpleStateMachine4Testing { + + Long objectToWait = new Long(0); + volatile boolean blockOnApply = true; + + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + CompletableFuture<Message> future = new CompletableFuture<Message>(); + if (blockOnApply) { + synchronized (objectToWait) { + try { + objectToWait.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } + } + future.complete(new RaftTestUtil.SimpleMessage("done")); + return future; + } + + public void unBlockApplyTxn() { + blockOnApply = false; + synchronized (objectToWait) { + objectToWait.notifyAll(); + } + } + } + + @Test + public void testStateMachineShutdownWaitsForApplyTxn() throws Exception { + final RaftProperties prop = getProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + StateMachineWithConditionalWait.class, StateMachine.class); + final MiniRaftCluster cluster = newCluster(3); + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + RaftServerImpl leader = cluster.getLeader(); + RaftPeerId leaderId = leader.getId(); + + //Unblock leader and one follower + ((StateMachineWithConditionalWait)leader.getStateMachine()) + .unBlockApplyTxn(); + ((StateMachineWithConditionalWait)cluster. + getFollowers().get(0).getStateMachine()).unBlockApplyTxn(); + + cluster.getLeaderAndSendFirstMessage(true); + + final RaftClient client = cluster.createClient(leaderId); + client.send(new RaftTestUtil.SimpleMessage("message")); + RaftClientReply reply = client.send( + new RaftTestUtil.SimpleMessage("message2")); + + long logIndex = reply.getLogIndex(); + //Confirm that followers have committed + RaftClientReply watchReply = client.sendWatch( + logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED); + watchReply.getCommitInfos().forEach( + val -> Assert.assertEquals(val.getCommitIndex(), logIndex)); + + RaftServerImpl secondFollower = cluster.getFollowers().get(1); + // Second follower is blocked in apply transaction + Assert.assertTrue( + secondFollower.getState().getLastAppliedIndex() + < logIndex); + + // Now shutdown the follower in a separate thread + Thread t = new Thread(() -> secondFollower.shutdown(true)); + t.start(); + + // The second follower should still be blocked in apply transaction + Assert.assertTrue( + secondFollower.getState().getLastAppliedIndex() + < logIndex); + + // Now unblock the second follower + ((StateMachineWithConditionalWait)secondFollower.getStateMachine()) + .unBlockApplyTxn(); + + // Now wait for the thread + t.join(5000); + Assert.assertEquals( + secondFollower.getState().getLastAppliedIndex(), + logIndex); + + client.close(); + cluster.shutdown(); + } +}
