This is an automated email from the ASF dual-hosted git repository. nwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new ea96a34 [STREAMCOMP-2885] broadcast GCC completition (#3330) ea96a34 is described below commit ea96a34585e4e02e014277a06b4ba7b4b37a9317 Author: Lawrence Pan <lawrencefei...@gmail.com> AuthorDate: Fri Sep 27 12:50:44 2019 -0400 [STREAMCOMP-2885] broadcast GCC completition (#3330) * amend gitignore * decouple GCC post-save logic from stateful controller * tmaster broadcast checkpoint completion the message will be first sent to all stmgrs, and then each stmgr will forward it to every heron instance it connects to. * heron instance handles global checkpoint saved msg * expose necessary API for the 2PC support unit tests for this will be in the following commit * test: refactor mock physical plan to a builder * test preSave and postSave hook * [STREAMCOMP-2916] block execute on postSave * add stateful tests for spouts * test preRestore * rename interface to ITwoPhaseStatefulComponent * block spout from reading data tuples as well * fix log format * do not check interface this is because the boolean will only be set to true for tasks that implement ITwoPhaseStatefulComponent. No need to do the check again. --- .gitignore | 5 + .../api/topology/ITwoPhaseStatefulComponent.java | 67 ++++++ .../java/org/apache/heron/instance/IInstance.java | 15 ++ .../apache/heron/instance/InstanceControlMsg.java | 13 + .../src/java/org/apache/heron/instance/Slave.java | 24 +- .../apache/heron/instance/bolt/BoltInstance.java | 28 ++- .../apache/heron/instance/spout/SpoutInstance.java | 38 ++- .../apache/heron/network/StreamManagerClient.java | 14 ++ heron/instance/tests/java/BUILD | 2 + .../instance/bolt/BoltStatefulInstanceTest.java | 261 +++++++++++++++++++++ .../instance/spout/SpoutStatefulInstanceTest.java | 243 +++++++++++++++++++ .../java/org/apache/heron/resource/Constants.java | 6 + .../heron/resource/MockPhysicalPlansBuilder.java | 155 ++++++++++++ .../apache/heron/resource/TestStatefulBolt.java | 69 ++++++ .../apache/heron/resource/TestStatefulSpout.java | 76 ++++++ .../heron/resource/TestTwoPhaseStatefulBolt.java | 98 ++++++++ .../heron/resource/TestTwoPhaseStatefulSpout.java | 97 ++++++++ .../org/apache/heron/resource/UnitTestHelper.java | 164 ++++++------- heron/proto/ckptmgr.proto | 6 + heron/stmgr/src/cpp/manager/instance-server.cpp | 9 + heron/stmgr/src/cpp/manager/instance-server.h | 3 + heron/stmgr/src/cpp/manager/stmgr.cpp | 14 +- heron/stmgr/src/cpp/manager/stmgr.h | 4 + heron/stmgr/src/cpp/manager/tmaster-client.cpp | 11 +- heron/stmgr/src/cpp/manager/tmaster-client.h | 10 +- .../src/cpp/manager/stateful-controller.cpp | 24 +- .../tmaster/src/cpp/manager/stateful-controller.h | 5 +- heron/tmaster/src/cpp/manager/stmgrstate.cpp | 9 +- heron/tmaster/src/cpp/manager/stmgrstate.h | 2 + heron/tmaster/src/cpp/manager/tmaster.cpp | 22 +- heron/tmaster/src/cpp/manager/tmaster.h | 3 +- 31 files changed, 1387 insertions(+), 110 deletions(-) diff --git a/.gitignore b/.gitignore index 3b81340..c599612 100644 --- a/.gitignore +++ b/.gitignore @@ -90,6 +90,10 @@ tools/jdk/ijar .idea/sqlDataSources.xml .idea/dynamic.xml +# Intellij bazel plugin +.ijwb +.clwb + ## Maven generated files .classpath.txt @@ -134,6 +138,7 @@ website2/website/static/api # Visual Studio Code .vscode +vs.code-workspace # integration_test results/ diff --git a/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java new file mode 100644 index 0000000..6dc88c3 --- /dev/null +++ b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.api.topology; + +import java.io.Serializable; + +/** + * Defines a stateful component that is aware of Heron topology's "two-phase commit". + * + * Note tasks saving a distributed checkpoint would be the "prepare" phase of the two-phase commit + * algorithm. When a distributed checkpoint is done, we can say that all tasks agree that they + * will not roll back to the time before that distributed checkpoint, and the "prepare" phase is + * complete. + * + * When the "prepare" phase is complete, Heron will invoke the "postSave" hook to signal the + * beginning of the "commit" phase. If there is a failure occurred during the "prepare" phase, + * Heron will invoke the hook "preRestore" to signal that two-phase commit is aborted, and the + * topology will be rolled back to the previous checkpoint. + * + * Note that the commit phase will finish after the postSave hook exits successfully. Then, the + * prepare phase of the following checkpoint will begin. + * + * In addition, for two-phase stateful components specifically, Heron will not execute (for bolts) + * or produce (for spouts) tuples between preSave and postSave. This will guarantee that the prepare + * phase of the next checkpoint will not overlap with the commit phase of the current checkpoint + * (eg. we block execution of tuples from the next checkpoint unless commit phase is done). + * + * See the end-to-end effectively-once designed doc (linked in the PR of this commit) for more + * details. + */ +public interface ITwoPhaseStatefulComponent<K extends Serializable, V extends Serializable> + extends IStatefulComponent<K, V> { + + /** + * This is a hook for the component to perform some actions after a checkpoint is persisted + * successfully for all components in the topology. + * + * @param checkpointId the ID of the checkpoint + */ + void postSave(String checkpointId); + + /** + * This is a hook for the component to perform some actions (eg. state clean-up) before the + * framework attempts to delete the component and restore it to a previously-saved checkpoint. + * + * @param checkpointId the ID of the checkpoint that the component is being restored to + */ + void preRestore(String checkpointId); + +} diff --git a/heron/instance/src/java/org/apache/heron/instance/IInstance.java b/heron/instance/src/java/org/apache/heron/instance/IInstance.java index de7fbd0..9671a1a 100644 --- a/heron/instance/src/java/org/apache/heron/instance/IInstance.java +++ b/heron/instance/src/java/org/apache/heron/instance/IInstance.java @@ -58,6 +58,21 @@ public interface IInstance { void clean(); /** + * Inform the Instance that the framework will clean, stop, and delete the instance + * in order to restore its state to a previously-saved checkpoint. + * + * @param checkpointId the ID of the checkpoint the instance will be restoring to + */ + void preRestore(String checkpointId); + + /** + * Inform the Instance that a particular checkpoint has become globally consistent + * + * @param checkpointId the ID of the checkpoint that became globally consistent + */ + void onCheckpointSaved(String checkpointId); + + /** * Destroy the whole IInstance. * Notice: It should only be called when the whole program is * exiting. And in fact, this method should never be called. diff --git a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java index 789f53b..c58c384 100644 --- a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java +++ b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java @@ -26,11 +26,13 @@ public final class InstanceControlMsg { private PhysicalPlanHelper newPhysicalPlanHelper; private CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest; private CheckpointManager.StartInstanceStatefulProcessing startInstanceStatefulProcessing; + private CheckpointManager.StatefulConsistentCheckpointSaved statefulConsistentCheckpointSaved; private InstanceControlMsg(Builder builder) { this.newPhysicalPlanHelper = builder.newPhysicalPlanHelper; this.restoreInstanceStateRequest = builder.restoreInstanceStateRequest; this.startInstanceStatefulProcessing = builder.startInstanceStatefulProcessing; + this.statefulConsistentCheckpointSaved = builder.statefulConsistentCheckpointSaved; } public static Builder newBuilder() { @@ -41,6 +43,10 @@ public final class InstanceControlMsg { return newPhysicalPlanHelper; } + public CheckpointManager.StatefulConsistentCheckpointSaved getStatefulCheckpointSavedMessage() { + return this.statefulConsistentCheckpointSaved; + } + public boolean isNewPhysicalPlanHelper() { return newPhysicalPlanHelper != null; } @@ -65,6 +71,7 @@ public final class InstanceControlMsg { private PhysicalPlanHelper newPhysicalPlanHelper; private CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest; private CheckpointManager.StartInstanceStatefulProcessing startInstanceStatefulProcessing; + private CheckpointManager.StatefulConsistentCheckpointSaved statefulConsistentCheckpointSaved; private Builder() { @@ -87,6 +94,12 @@ public final class InstanceControlMsg { return this; } + public Builder setStatefulCheckpointSaved( + CheckpointManager.StatefulConsistentCheckpointSaved message) { + this.statefulConsistentCheckpointSaved = message; + return this; + } + public InstanceControlMsg build() { return new InstanceControlMsg(this); } diff --git a/heron/instance/src/java/org/apache/heron/instance/Slave.java b/heron/instance/src/java/org/apache/heron/instance/Slave.java index 5b85fc6..4e57772 100644 --- a/heron/instance/src/java/org/apache/heron/instance/Slave.java +++ b/heron/instance/src/java/org/apache/heron/instance/Slave.java @@ -123,6 +123,16 @@ public class Slave implements Runnable, AutoCloseable { if (instanceControlMsg.isNewPhysicalPlanHelper()) { handleNewPhysicalPlan(instanceControlMsg); } + + // When a checkpoint becomes "globally consistent" + if (instanceControlMsg.getStatefulCheckpointSavedMessage() != null) { + String checkpointId = instanceControlMsg + .getStatefulCheckpointSavedMessage() + .getConsistentCheckpoint() + .getCheckpointId(); + + handleGlobalCheckpointConsistent(checkpointId); + } } } }; @@ -130,6 +140,11 @@ public class Slave implements Runnable, AutoCloseable { slaveLooper.addTasksOnWakeup(handleControlMessageTask); } + private void handleGlobalCheckpointConsistent(String checkpointId) { + LOG.log(Level.INFO, "checkpoint: {0} has become globally consistent", checkpointId); + instance.onCheckpointSaved(checkpointId); + } + private void resetCurrentAssignment() { helper.setTopologyContext(metricsCollector); instance = helper.getMySpout() != null @@ -262,7 +277,7 @@ public class Slave implements Runnable, AutoCloseable { startInstanceIfNeeded(); } - private void cleanAndStopSlave() { + private void cleanAndStopSlaveBeforeRestore(String checkpointId) { // Clear all queues streamInCommunicator.clear(); streamOutCommunicator.clear(); @@ -275,6 +290,7 @@ public class Slave implements Runnable, AutoCloseable { slaveLooper.clearTimers(); if (instance != null) { + instance.preRestore(checkpointId); instance.clean(); } @@ -294,9 +310,13 @@ public class Slave implements Runnable, AutoCloseable { private void handleRestoreInstanceStateRequest(InstanceControlMsg instanceControlMsg) { CheckpointManager.RestoreInstanceStateRequest request = instanceControlMsg.getRestoreInstanceStateRequest(); + + // ID of the checkpoint we are restoring to + String checkpointId = request.getState().getCheckpointId(); + // Clean buffers and unregister tasks in slave looper if (isInstanceStarted) { - cleanAndStopSlave(); + cleanAndStopSlaveBeforeRestore(checkpointId); } // Restore the state diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java index 9cc177a..a53ed3d 100644 --- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java +++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java @@ -37,6 +37,7 @@ import org.apache.heron.api.metric.GlobalMetrics; import org.apache.heron.api.serializer.IPluggableSerializer; import org.apache.heron.api.state.State; import org.apache.heron.api.topology.IStatefulComponent; +import org.apache.heron.api.topology.ITwoPhaseStatefulComponent; import org.apache.heron.api.topology.IUpdatable; import org.apache.heron.api.utils.Utils; import org.apache.heron.common.basics.Communicator; @@ -74,6 +75,9 @@ public class BoltInstance implements IInstance { private State<Serializable, Serializable> instanceState; + // default to false, can only be toggled to true if bolt implements ITwoPhaseStatefulComponent + private boolean waitingForCheckpointSaved; + // The reference to topology's config private final Map<String, Object> config; @@ -110,6 +114,8 @@ public class BoltInstance implements IInstance { String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)), helper.getMyInstanceId()); + this.waitingForCheckpointSaved = false; + if (helper.getMyBolt() == null) { throw new RuntimeException("HeronBoltInstance has no bolt in physical plan."); } @@ -162,10 +168,12 @@ public class BoltInstance implements IInstance { // so that topology emit, ack, fail are thread safe collector.lock.lock(); try { - // Checkpoint if (bolt instanceof IStatefulComponent) { ((IStatefulComponent) bolt).preSave(checkpointId); } + if (bolt instanceof ITwoPhaseStatefulComponent) { + waitingForCheckpointSaved = true; + } collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation); } finally { collector.lock.unlock(); @@ -214,6 +222,21 @@ public class BoltInstance implements IInstance { } @Override + public void preRestore(String checkpointId) { + if (bolt instanceof ITwoPhaseStatefulComponent) { + ((ITwoPhaseStatefulComponent) bolt).preRestore(checkpointId); + } + } + + @Override + public void onCheckpointSaved(String checkpointId) { + if (bolt instanceof ITwoPhaseStatefulComponent) { + ((ITwoPhaseStatefulComponent) bolt).postSave(checkpointId); + waitingForCheckpointSaved = false; + } + } + + @Override public void clean() { // Invoke clean up hook before clean() is called helper.getTopologyContext().invokeHookCleanup(); @@ -238,6 +261,7 @@ public class BoltInstance implements IInstance { @Override public void run() { boltMetrics.updateTaskRunCount(); + // Back-pressure -- only when we could send out tuples will we read & execute tuples if (collector.isOutQueuesAvailable()) { boltMetrics.updateExecutionCount(); @@ -271,7 +295,7 @@ public class BoltInstance implements IInstance { long startOfCycle = System.nanoTime(); // Read data from in Queues - while (!inQueue.isEmpty()) { + while (!inQueue.isEmpty() && !waitingForCheckpointSaved) { Message msg = inQueue.poll(); if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) { diff --git a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java index 53675c4..e9a0e13 100644 --- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java +++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java @@ -36,6 +36,7 @@ import org.apache.heron.api.spout.ISpout; import org.apache.heron.api.spout.SpoutOutputCollector; import org.apache.heron.api.state.State; import org.apache.heron.api.topology.IStatefulComponent; +import org.apache.heron.api.topology.ITwoPhaseStatefulComponent; import org.apache.heron.api.topology.IUpdatable; import org.apache.heron.api.utils.Utils; import org.apache.heron.common.basics.ByteAmount; @@ -71,6 +72,9 @@ public class SpoutInstance implements IInstance { private final boolean spillState; private final String spillStateLocation; + // default to false, can only be toggled to true if spout implements ITwoPhaseStatefulComponent + private boolean waitingForCheckpointSaved; + private State<Serializable, Serializable> instanceState; private final SlaveLooper looper; @@ -110,6 +114,8 @@ public class SpoutInstance implements IInstance { String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)), helper.getMyInstanceId()); + this.waitingForCheckpointSaved = false; + LOG.info("Is this topology stateful: " + isTopologyStateful); if (helper.getMySpout() == null) { @@ -171,10 +177,15 @@ public class SpoutInstance implements IInstance { if (spout instanceof IStatefulComponent) { ((IStatefulComponent) spout).preSave(checkpointId); } + + if (spout instanceof ITwoPhaseStatefulComponent) { + waitingForCheckpointSaved = true; + } collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation); } finally { collector.lock.unlock(); } + LOG.info("State persisted for checkpoint: " + checkpointId); } @@ -219,6 +230,21 @@ public class SpoutInstance implements IInstance { } @Override + public void preRestore(String checkpointId) { + if (spout instanceof ITwoPhaseStatefulComponent) { + ((ITwoPhaseStatefulComponent) spout).preRestore(checkpointId); + } + } + + @Override + public void onCheckpointSaved(String checkpointId) { + if (spout instanceof ITwoPhaseStatefulComponent) { + ((ITwoPhaseStatefulComponent) spout).postSave(checkpointId); + waitingForCheckpointSaved = false; + } + } + + @Override public void clean() { // Invoke clean up hook before clean() is called helper.getTopologyContext().invokeHookCleanup(); @@ -257,6 +283,9 @@ public class SpoutInstance implements IInstance { public void run() { spoutMetrics.updateTaskRunCount(); + // Check if we have any message to process anyway + readTuplesAndExecute(streamInQueue); + // Check whether we should produce more tuples if (isProduceTuple()) { spoutMetrics.updateProduceTupleCount(); @@ -272,9 +301,6 @@ public class SpoutInstance implements IInstance { spoutMetrics.updateOutQueueFullCount(); } - // Check if we have any message to process anyway - readTuplesAndExecute(streamInQueue); - if (ackEnabled) { // Update the pending-to-be-acked tuples counts spoutMetrics.updatePendingTuplesCount(collector.numInFlight()); @@ -330,12 +356,14 @@ public class SpoutInstance implements IInstance { * It is allowed in: * 1. Outgoing Stream queue is available * 2. Topology State is RUNNING + * 3. If the Spout implements ITwoPhaseStatefulComponent, not waiting for checkpoint saved message * * @return true to allow produceTuple() to be invoked */ private boolean isProduceTuple() { return collector.isOutQueuesAvailable() - && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING); + && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING) + && !waitingForCheckpointSaved; } protected void produceTuple() { @@ -435,7 +463,7 @@ public class SpoutInstance implements IInstance { long startOfCycle = System.nanoTime(); Duration spoutAckBatchTime = systemConfig.getInstanceAckBatchTime(); - while (!inQueue.isEmpty()) { + while (!inQueue.isEmpty() && !waitingForCheckpointSaved) { Message msg = inQueue.poll(); if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) { diff --git a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java index 69b2493..fb9212b 100644 --- a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java +++ b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java @@ -120,6 +120,7 @@ public class StreamManagerClient extends HeronClient { registerOnMessage(CheckpointManager.InitiateStatefulCheckpoint.newBuilder()); registerOnMessage(CheckpointManager.RestoreInstanceStateRequest.newBuilder()); registerOnMessage(CheckpointManager.StartInstanceStatefulProcessing.newBuilder()); + registerOnMessage(CheckpointManager.StatefulConsistentCheckpointSaved.newBuilder()); } @@ -205,6 +206,8 @@ public class StreamManagerClient extends HeronClient { handleRestoreInstanceStateRequest((CheckpointManager.RestoreInstanceStateRequest) message); } else if (message instanceof CheckpointManager.StartInstanceStatefulProcessing) { handleStartStatefulRequest((CheckpointManager.StartInstanceStatefulProcessing) message); + } else if (message instanceof CheckpointManager.StatefulConsistentCheckpointSaved) { + handleCheckpointSaved((CheckpointManager.StatefulConsistentCheckpointSaved) message); } else { throw new RuntimeException("Unknown kind of message received from Stream Manager"); } @@ -286,6 +289,17 @@ public class StreamManagerClient extends HeronClient { inControlQueue.offer(instanceControlMsg); } + private void handleCheckpointSaved( + CheckpointManager.StatefulConsistentCheckpointSaved message) { + LOG.info("Received a StatefulCheckpointSaved message with checkpoint id: " + + message.getConsistentCheckpoint().getCheckpointId()); + + InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder() + .setStatefulCheckpointSaved(message) + .build(); + inControlQueue.offer(instanceControlMsg); + } + private void handleRestoreInstanceStateRequest( CheckpointManager.RestoreInstanceStateRequest request) { LOG.info("Received a RestoreInstanceState request with checkpoint id: " diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD index 0c79300..23b2186 100644 --- a/heron/instance/tests/java/BUILD +++ b/heron/instance/tests/java/BUILD @@ -24,8 +24,10 @@ java_tests( "org.apache.heron.grouping.EmitDirectBoltTest", "org.apache.heron.grouping.EmitDirectSpoutTest", "org.apache.heron.instance.bolt.BoltInstanceTest", + "org.apache.heron.instance.bolt.BoltStatefulInstanceTest", "org.apache.heron.instance.spout.ActivateDeactivateTest", "org.apache.heron.instance.spout.SpoutInstanceTest", + "org.apache.heron.instance.spout.SpoutStatefulInstanceTest", "org.apache.heron.metrics.GlobalMetricsTest", "org.apache.heron.metrics.MultiAssignableMetricTest", "org.apache.heron.network.ConnectTest", diff --git a/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java new file mode 100644 index 0000000..d5eac2c --- /dev/null +++ b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.instance.bolt; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.google.protobuf.ByteString; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.heron.api.Config; +import org.apache.heron.api.bolt.IRichBolt; +import org.apache.heron.api.generated.TopologyAPI; +import org.apache.heron.api.serializer.IPluggableSerializer; +import org.apache.heron.api.serializer.JavaSerializer; +import org.apache.heron.common.basics.SingletonRegistry; +import org.apache.heron.common.utils.misc.PhysicalPlanHelper; +import org.apache.heron.instance.InstanceControlMsg; +import org.apache.heron.instance.SlaveTester; +import org.apache.heron.proto.system.HeronTuples; +import org.apache.heron.proto.system.PhysicalPlans; +import org.apache.heron.resource.Constants; +import org.apache.heron.resource.MockPhysicalPlansBuilder; +import org.apache.heron.resource.TestSpout; +import org.apache.heron.resource.TestStatefulBolt; +import org.apache.heron.resource.TestTwoPhaseStatefulBolt; +import org.apache.heron.resource.UnitTestHelper; + +import static org.junit.Assert.*; + +/** + * Test if stateful bolt is able to respond to incoming control/data tuples as expected. + */ +public class BoltStatefulInstanceTest { + private SlaveTester slaveTester; + private static IPluggableSerializer serializer = new JavaSerializer(); + + @Before + public void before() { + slaveTester = new SlaveTester(); + slaveTester.start(); + } + + @After + public void after() throws NoSuchFieldException, IllegalAccessException { + slaveTester.stop(); + } + + @Test + public void testPreSaveAndPostSave() throws Exception { + CountDownLatch preSaveLatch = new CountDownLatch(1); + CountDownLatch postSaveLatch = new CountDownLatch(1); + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt()); + + // initially non of preSave or postSave are invoked yet + assertEquals(1, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + + // this should invoke preSave + slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0")); + assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + + // this should invoke postSave + slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0")); + assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(0, postSaveLatch.getCount()); + } + + @Test + public void testPreRestore() throws InterruptedException { + CountDownLatch preRestoreLatch = new CountDownLatch(1); + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, preRestoreLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt()); + + assertEquals(1, preRestoreLatch.getCount()); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx")); + + assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preRestoreLatch.getCount()); + } + + /** + * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, execute will not be invoked + * unless the corresponding postSave is called. + */ + @Test + public void testPostSaveBlockExecute() throws Exception { + CountDownLatch preSaveLatch = new CountDownLatch(1); + CountDownLatch postSaveLatch = new CountDownLatch(1); + + CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple + + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt()); + + // initially non of preSave or postSave are invoked yet + assertEquals(1, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + assertEquals(1, executeLatch.getCount()); + + // this should invoke preSave + slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0")); + + // put a data tuple into the inStreamQueue + slaveTester.getInStreamQueue().offer(buildTupleSet()); + + assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + assertEquals(1, executeLatch.getCount()); + + // Wait for a bounded amount of time, assert that the tuple will not execute as it is + // blocked on postSave. This is because we only want to allow one uncommitted "transaction" on + // each task. See the design doc for more details. + assertFalse(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + assertEquals(1, executeLatch.getCount()); + + // this should invoke postSave + slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0")); + assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + + assertEquals(0, preSaveLatch.getCount()); + assertEquals(0, postSaveLatch.getCount()); + assertEquals(0, executeLatch.getCount()); + } + + /** + * Ensure that the aforementioned behaviour does not apply for bolts that don't implement + * ITwoPhaseStatefulComponent + */ + @Test + public void testExecuteNotBlocked() throws Exception { + CountDownLatch preSaveLatch = new CountDownLatch(1); + CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple + + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulBolt()); + + // initially non of preSave or postSave are invoked yet + assertEquals(1, preSaveLatch.getCount()); + assertEquals(1, executeLatch.getCount()); + + // this should invoke preSave + slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0")); + + // put a data tuple into the inStreamQueue + slaveTester.getInStreamQueue().offer(buildTupleSet()); + + assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + + // no need to wait for postSave as the bolt doesn't implement ITwoPhaseStatefulComponent + assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(0, executeLatch.getCount()); + } + + // build a tuple set that contains one data tuple + private HeronTuples.HeronTupleSet buildTupleSet() { + HeronTuples.HeronTupleSet.Builder heronTupleSet = HeronTuples.HeronTupleSet.newBuilder(); + heronTupleSet.setSrcTaskId(1); + HeronTuples.HeronDataTupleSet.Builder dataTupleSet = HeronTuples.HeronDataTupleSet.newBuilder(); + TopologyAPI.StreamId.Builder streamId = TopologyAPI.StreamId.newBuilder(); + streamId.setComponentName("test-spout"); + streamId.setId("default"); + dataTupleSet.setStream(streamId); + + HeronTuples.HeronDataTuple.Builder dataTuple = HeronTuples.HeronDataTuple.newBuilder(); + dataTuple.setKey(0); + + HeronTuples.RootId.Builder rootId = HeronTuples.RootId.newBuilder(); + rootId.setKey(0); + rootId.setTaskid(0); + dataTuple.addRoots(rootId); + + dataTuple.addValues(ByteString.copyFrom(serializer.serialize("A"))); + dataTupleSet.addTuples(dataTuple); + heronTupleSet.setData(dataTupleSet); + + return heronTupleSet.build(); + } + + private InstanceControlMsg buildPhysicalPlanMessageFor2PCBolt() { + return buildPhysicalPlanMessage(new TestTwoPhaseStatefulBolt()); + } + + private InstanceControlMsg buildPhysicalPlanMessageForStatefulBolt() { + return buildPhysicalPlanMessage(new TestStatefulBolt()); + } + + private InstanceControlMsg buildPhysicalPlanMessage(IRichBolt bolt) { + PhysicalPlans.PhysicalPlan physicalPlan = + MockPhysicalPlansBuilder + .newBuilder() + .withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1) + .withTopologyState(TopologyAPI.TopologyState.RUNNING) + .withSpoutInstance( + "test-spout", + 0, + "spout-id", + new TestSpout() + ) + .withBoltInstance( + "test-bolt", + 1, + "bolt-id", + "test-spout", + bolt + ) + .build(); + + PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "bolt-id"); + + return InstanceControlMsg.newBuilder() + .setNewPhysicalPlanHelper(ph) + .build(); + } +} diff --git a/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java new file mode 100644 index 0000000..4d65aec --- /dev/null +++ b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.instance.spout; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.heron.api.Config; +import org.apache.heron.api.generated.TopologyAPI; +import org.apache.heron.api.serializer.IPluggableSerializer; +import org.apache.heron.api.serializer.JavaSerializer; +import org.apache.heron.api.spout.IRichSpout; +import org.apache.heron.common.basics.SingletonRegistry; +import org.apache.heron.common.utils.misc.PhysicalPlanHelper; +import org.apache.heron.instance.InstanceControlMsg; +import org.apache.heron.instance.SlaveTester; +import org.apache.heron.proto.system.PhysicalPlans; +import org.apache.heron.resource.Constants; +import org.apache.heron.resource.MockPhysicalPlansBuilder; +import org.apache.heron.resource.TestBolt; +import org.apache.heron.resource.TestStatefulSpout; +import org.apache.heron.resource.TestTwoPhaseStatefulSpout; +import org.apache.heron.resource.UnitTestHelper; + +import static org.junit.Assert.*; + +public class SpoutStatefulInstanceTest { + + private SlaveTester slaveTester; + private static IPluggableSerializer serializer = new JavaSerializer(); + + @Before + public void before() { + slaveTester = new SlaveTester(); + slaveTester.start(); + } + + @After + public void after() throws NoSuchFieldException, IllegalAccessException { + slaveTester.stop(); + } + + @Test + public void testPreSaveAndPostSave() throws Exception { + CountDownLatch preSaveLatch = new CountDownLatch(1); + CountDownLatch postSaveLatch = new CountDownLatch(1); + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout()); + + // initially non of preSave or postSave are invoked yet + assertEquals(1, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + + // this should invoke preSave + slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0")); + assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + + // this should invoke postSave + slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0")); + assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(0, postSaveLatch.getCount()); + } + + @Test + public void testPreRestore() throws InterruptedException { + CountDownLatch preRestoreLatch = new CountDownLatch(1); + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, preRestoreLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout()); + + assertEquals(1, preRestoreLatch.getCount()); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx")); + + assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preRestoreLatch.getCount()); + } + + /** + * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, execute will not be invoked + * unless the corresponding postSave is called. + */ + @Test + public void testPostSaveBlockExecute() throws Exception { + // when this boolean is set to false, nextTuple on the spout will be run, but the spout will + // make sure to not emit any tuples. + AtomicBoolean shouldStartEmit = new AtomicBoolean(false); + SingletonRegistry.INSTANCE.registerSingleton( + Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit); + + CountDownLatch preSaveLatch = new CountDownLatch(1); + CountDownLatch postSaveLatch = new CountDownLatch(1); + CountDownLatch emitLatch = new CountDownLatch(1); + + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, emitLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout()); + + // initially non of preSave or postSave are invoked yet + assertEquals(1, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + + // this should invoke preSave + slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0")); + + // tell the spout to start emitting tuples + assertFalse(shouldStartEmit.getAndSet(true)); + + // since preSave is executed, spout will not emit until postSave is called + assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + assertEquals(1, emitLatch.getCount()); + + // Wait for a bounded amount of time, assert that the spout will not emit tuples as it is + // blocked on postSave. This is because we only want to allow one uncommitted "transaction" on + // each task. See the design doc for more details. + assertFalse(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(1, postSaveLatch.getCount()); + assertEquals(1, emitLatch.getCount()); + + // this should invoke postSave + slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0")); + assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + + assertEquals(0, preSaveLatch.getCount()); + assertEquals(0, postSaveLatch.getCount()); + assertEquals(0, emitLatch.getCount()); + } + + /** + * Ensure that the aforementioned behaviour does not apply for spouts that don't implement + * ITwoPhaseStatefulComponent + */ + @Test + public void testExecuteNotBlocked() throws Exception { + // when this boolean is set to false, nextTuple on the spout will be run, but the spout will + // make sure to not emit any tuples. + AtomicBoolean shouldStartEmit = new AtomicBoolean(false); + SingletonRegistry.INSTANCE.registerSingleton( + Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit); + + CountDownLatch preSaveLatch = new CountDownLatch(1); + CountDownLatch emitLatch = new CountDownLatch(1); + + SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch); + SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, emitLatch); + + slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0")); + slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0")); + slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulSpout()); + + // initially non of preSave or postSave are invoked yet + assertEquals(1, preSaveLatch.getCount()); + assertEquals(1, emitLatch.getCount()); + + // this should invoke preSave + slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0")); + + // tell the spout to start emitting tuples + assertFalse(shouldStartEmit.getAndSet(true)); + + assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + + // no need to wait for postSave as the bolt doesn't implement ITwoPhaseStatefulComponent + assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS)); + assertEquals(0, preSaveLatch.getCount()); + assertEquals(0, emitLatch.getCount()); + } + + private InstanceControlMsg buildPhysicalPlanMessageFor2PCSpout() { + return buildPhysicalPlanMessage(new TestTwoPhaseStatefulSpout()); + } + + private InstanceControlMsg buildPhysicalPlanMessageForStatefulSpout() { + return buildPhysicalPlanMessage(new TestStatefulSpout()); + } + + private InstanceControlMsg buildPhysicalPlanMessage(IRichSpout spout) { + PhysicalPlans.PhysicalPlan physicalPlan = + MockPhysicalPlansBuilder + .newBuilder() + .withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1) + .withTopologyState(TopologyAPI.TopologyState.RUNNING) + .withSpoutInstance( + "test-spout", + 0, + "spout-id", + spout + ) + .withBoltInstance( + "test-bolt", + 1, + "bolt-id", + "test-spout", + new TestBolt() + ) + .build(); + + PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "spout-id"); + + return InstanceControlMsg.newBuilder() + .setNewPhysicalPlanHelper(ph) + .build(); + } +} diff --git a/heron/instance/tests/java/org/apache/heron/resource/Constants.java b/heron/instance/tests/java/org/apache/heron/resource/Constants.java index 3ffe9b0..9b7d2ce 100644 --- a/heron/instance/tests/java/org/apache/heron/resource/Constants.java +++ b/heron/instance/tests/java/org/apache/heron/resource/Constants.java @@ -39,12 +39,18 @@ public final class Constants { public static final String ACK_COUNT = "ack-count"; public static final String EXECUTE_LATCH = "execute-latch"; + public static final String EMIT_LATCH = "emit-latch"; public static final String FAIL_LATCH = "fail-latch"; public static final String ACK_LATCH = "ack-latch"; public static final String ACTIVATE_COUNT_LATCH = "activate-count-latch"; public static final String DEACTIVATE_COUNT_LATCH = "deactivate-count-latch"; + public static final String PRESAVE_LATCH = "preSave-latch"; + public static final String POSTSAVE_LATCH = "postSave-latch"; + public static final String PRERESTORE_LATCH = "postSave-latch"; + public static final String SPOUT_SHOULD_START_EMIT = "spout-should-start-emit"; + public static final String RECEIVED_STRING_LIST = "received-string-list"; public static final String HERON_SYSTEM_CONFIG = "org.apache.heron.common.config.SystemConfig"; diff --git a/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java new file mode 100644 index 0000000..14a36cf --- /dev/null +++ b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.resource; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Ignore; + +import org.apache.heron.api.Config; +import org.apache.heron.api.bolt.IRichBolt; +import org.apache.heron.api.generated.TopologyAPI; +import org.apache.heron.api.spout.IRichSpout; +import org.apache.heron.api.topology.TopologyBuilder; +import org.apache.heron.proto.system.PhysicalPlans; + +@Ignore +public final class MockPhysicalPlansBuilder { + + private PhysicalPlans.PhysicalPlan.Builder pPlan; + private TopologyBuilder topologyBuilder; + private List<PhysicalPlans.Instance.Builder> instanceBuilders; + + private Config conf; + private TopologyAPI.TopologyState initialTopologyState; + + private MockPhysicalPlansBuilder() { + pPlan = PhysicalPlans.PhysicalPlan.newBuilder(); + topologyBuilder = new TopologyBuilder(); + instanceBuilders = new ArrayList<>(); + + conf = null; + initialTopologyState = null; + } + + public static MockPhysicalPlansBuilder newBuilder() { + return new MockPhysicalPlansBuilder(); + } + + public MockPhysicalPlansBuilder withTopologyConfig( + Config.TopologyReliabilityMode reliabilityMode, + int messageTimeout + ) { + conf = new Config(); + conf.setTeamEmail("streaming-comp...@twitter.com"); + conf.setTeamName("stream-computing"); + conf.setTopologyProjectName("heron-integration-test"); + conf.setNumStmgrs(1); + conf.setMaxSpoutPending(100); + conf.setTopologyReliabilityMode(reliabilityMode); + if (messageTimeout != -1) { + conf.setMessageTimeoutSecs(messageTimeout); + conf.put("topology.enable.message.timeouts", "true"); + } + + return this; + } + + public MockPhysicalPlansBuilder withTopologyState(TopologyAPI.TopologyState topologyState) { + initialTopologyState = topologyState; + return this; + } + + public MockPhysicalPlansBuilder withSpoutInstance( + String componentName, + int taskId, + String instanceId, + IRichSpout spout + ) { + PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder(); + spoutInstanceInfo.setComponentName(componentName); + spoutInstanceInfo.setTaskId(taskId); + spoutInstanceInfo.setComponentIndex(0); + + PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder(); + spoutInstance.setInstanceId(instanceId); + spoutInstance.setStmgrId("stream-manager-id"); + spoutInstance.setInfo(spoutInstanceInfo); + + topologyBuilder.setSpout(componentName, spout, 1); + instanceBuilders.add(spoutInstance); + + return this; + } + + public MockPhysicalPlansBuilder withBoltInstance( + String componentName, + int taskId, + String instanceId, + String upStreamComponentId, + IRichBolt bolt + ) { + PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder(); + boltInstanceInfo.setComponentName(componentName); + boltInstanceInfo.setTaskId(taskId); + boltInstanceInfo.setComponentIndex(0); + + PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder(); + boltInstance.setInstanceId(instanceId); + boltInstance.setStmgrId("stream-manager-id"); + boltInstance.setInfo(boltInstanceInfo); + + topologyBuilder.setBolt(componentName, bolt, 1) + .shuffleGrouping(upStreamComponentId); + instanceBuilders.add(boltInstance); + + return this; + } + + public PhysicalPlans.PhysicalPlan build() { + addStmgrs(); + pPlan.setTopology(buildTopology()); + + for (PhysicalPlans.Instance.Builder b : instanceBuilders) { + pPlan.addInstances(b); + } + + return pPlan.build(); + } + + private TopologyAPI.Topology buildTopology() { + return topologyBuilder + .createTopology() + .setName("topology-name") + .setConfig(conf) + .setState(initialTopologyState) + .getTopology(); + } + + private void addStmgrs() { + PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder(); + stmgr.setId("stream-manager-id"); + stmgr.setHostName("127.0.0.1"); + stmgr.setDataPort(8888); + stmgr.setLocalEndpoint("endpoint"); + pPlan.addStmgrs(stmgr); + } + +} diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java new file mode 100644 index 0000000..e16efff --- /dev/null +++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.resource; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.apache.heron.api.bolt.BaseRichBolt; +import org.apache.heron.api.bolt.OutputCollector; +import org.apache.heron.api.state.State; +import org.apache.heron.api.topology.IStatefulComponent; +import org.apache.heron.api.topology.OutputFieldsDeclarer; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.api.tuple.Tuple; +import org.apache.heron.common.basics.SingletonRegistry; + +public class TestStatefulBolt extends BaseRichBolt + implements IStatefulComponent<String, String> { + @Override + public void prepare( + Map<String, Object> heronConf, + TopologyContext context, + OutputCollector collector) { + } + + @Override + public void execute(Tuple input) { + CountDownLatch tupleExecutedLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH); + + if (tupleExecutedLatch != null) { + tupleExecutedLatch.countDown(); + } + } + + @Override + public void initState(State<String, String> state) { + } + + @Override + public void preSave(String checkpointId) { + CountDownLatch preSaveLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH); + + if (preSaveLatch != null) { + preSaveLatch.countDown(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } +} diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java new file mode 100644 index 0000000..c22a917 --- /dev/null +++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.resource; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.heron.api.spout.BaseRichSpout; +import org.apache.heron.api.spout.SpoutOutputCollector; +import org.apache.heron.api.state.State; +import org.apache.heron.api.topology.IStatefulComponent; +import org.apache.heron.api.topology.OutputFieldsDeclarer; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.common.basics.SingletonRegistry; + +public class TestStatefulSpout extends BaseRichSpout implements IStatefulComponent<String, String> { + @Override + public void open( + Map<String, Object> conf, + TopologyContext context, + SpoutOutputCollector collector) { + } + + @Override + public void nextTuple() { + AtomicBoolean shouldStartEmit = + (AtomicBoolean) SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT); + + if (shouldStartEmit != null && !shouldStartEmit.get()) { + return; + } + + // actually "emit" the tuple + CountDownLatch emitLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH); + + if (emitLatch != null) { + emitLatch.countDown(); + } + } + + @Override + public void initState(State<String, String> state) { + } + + @Override + public void preSave(String checkpointId) { + CountDownLatch preSaveLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH); + + if (preSaveLatch != null) { + preSaveLatch.countDown(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } +} diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java new file mode 100644 index 0000000..b67a9ee --- /dev/null +++ b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.resource; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import org.junit.Ignore; + +import org.apache.heron.api.bolt.BaseRichBolt; +import org.apache.heron.api.bolt.OutputCollector; +import org.apache.heron.api.state.State; +import org.apache.heron.api.topology.ITwoPhaseStatefulComponent; +import org.apache.heron.api.topology.OutputFieldsDeclarer; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.api.tuple.Fields; +import org.apache.heron.api.tuple.Tuple; +import org.apache.heron.common.basics.SingletonRegistry; + +@Ignore +public class TestTwoPhaseStatefulBolt extends BaseRichBolt + implements ITwoPhaseStatefulComponent<String, String> { + + private static final long serialVersionUID = -5160420613503624743L; + + @Override + public void prepare( + Map<String, Object> map, + TopologyContext topologyContext, + OutputCollector collector) { + } + + @Override + public void execute(Tuple tuple) { + CountDownLatch tupleExecutedLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH); + + if (tupleExecutedLatch != null) { + tupleExecutedLatch.countDown(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(new Fields("word")); + } + + @Override + public void postSave(String checkpointId) { + CountDownLatch postSaveLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH); + + if (postSaveLatch != null) { + postSaveLatch.countDown(); + } + } + + @Override + public void preRestore(String checkpointId) { + CountDownLatch preRestoreLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH); + + if (preRestoreLatch != null) { + preRestoreLatch.countDown(); + } + } + + @Override + public void initState(State<String, String> state) { + } + + @Override + public void preSave(String checkpointId) { + CountDownLatch preSaveLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH); + + if (preSaveLatch != null) { + preSaveLatch.countDown(); + } + } +} diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java new file mode 100644 index 0000000..a4b6efd --- /dev/null +++ b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.heron.resource; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.heron.api.spout.BaseRichSpout; +import org.apache.heron.api.spout.SpoutOutputCollector; +import org.apache.heron.api.state.State; +import org.apache.heron.api.topology.ITwoPhaseStatefulComponent; +import org.apache.heron.api.topology.OutputFieldsDeclarer; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.common.basics.SingletonRegistry; + +public class TestTwoPhaseStatefulSpout extends BaseRichSpout + implements ITwoPhaseStatefulComponent<String, String> { + @Override + public void open( + Map<String, Object> conf, + TopologyContext context, + SpoutOutputCollector collector) { + } + + @Override + public void nextTuple() { + AtomicBoolean shouldStartEmit = + (AtomicBoolean) SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT); + + if (shouldStartEmit != null && !shouldStartEmit.get()) { + return; + } + + // actually "emit" the tuple + CountDownLatch emitLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH); + + if (emitLatch != null) { + emitLatch.countDown(); + } + } + + @Override + public void postSave(String checkpointId) { + CountDownLatch postSaveLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH); + + if (postSaveLatch != null) { + postSaveLatch.countDown(); + } + } + + @Override + public void preRestore(String checkpointId) { + CountDownLatch preRestoreLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH); + + if (preRestoreLatch != null) { + preRestoreLatch.countDown(); + } + } + + @Override + public void initState(State<String, String> state) { + } + + @Override + public void preSave(String checkpointId) { + CountDownLatch preSaveLatch = + (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH); + + if (preSaveLatch != null) { + preSaveLatch.countDown(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } +} diff --git a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java index 904ecab..8e39333 100644 --- a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java +++ b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java @@ -23,14 +23,17 @@ import java.lang.reflect.Field; import java.nio.file.Paths; import java.util.Map; +import com.google.protobuf.Message; + import org.junit.Ignore; import org.apache.heron.api.Config; import org.apache.heron.api.generated.TopologyAPI; -import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.common.basics.SingletonRegistry; import org.apache.heron.common.config.SystemConfig; import org.apache.heron.common.config.SystemConfigKey; +import org.apache.heron.instance.InstanceControlMsg; +import org.apache.heron.proto.ckptmgr.CheckpointManager; import org.apache.heron.proto.stmgr.StreamManager; import org.apache.heron.proto.system.Common; import org.apache.heron.proto.system.PhysicalPlans; @@ -60,91 +63,36 @@ public final class UnitTestHelper { public static PhysicalPlans.PhysicalPlan getPhysicalPlan( boolean ackEnabled, int messageTimeout, - TopologyAPI.TopologyState topologyState) { - PhysicalPlans.PhysicalPlan.Builder pPlan = PhysicalPlans.PhysicalPlan.newBuilder(); - - setTopology(pPlan, ackEnabled, messageTimeout, topologyState); - - setInstances(pPlan); - - setStMgr(pPlan); - - return pPlan.build(); + TopologyAPI.TopologyState topologyState + ) { + Config.TopologyReliabilityMode reliabilityMode = ackEnabled + ? Config.TopologyReliabilityMode.ATLEAST_ONCE + : Config.TopologyReliabilityMode.ATMOST_ONCE; + + return MockPhysicalPlansBuilder + .newBuilder() + .withTopologyConfig(reliabilityMode, messageTimeout) + .withTopologyState(topologyState) + .withSpoutInstance( + "test-spout", + 0, + "spout-id", + new TestSpout() + ) + .withBoltInstance( + "test-bolt", + 1, + "bolt-id", + "test-spout", + new TestBolt() + ) + .build(); } public static PhysicalPlans.PhysicalPlan getPhysicalPlan(boolean ackEnabled, int messageTimeout) { return getPhysicalPlan(ackEnabled, messageTimeout, TopologyAPI.TopologyState.RUNNING); } - private static void setTopology(PhysicalPlans.PhysicalPlan.Builder pPlan, boolean ackEnabled, - int messageTimeout, TopologyAPI.TopologyState topologyState) { - TopologyBuilder topologyBuilder = new TopologyBuilder(); - topologyBuilder.setSpout("test-spout", new TestSpout(), 1); - // Here we need case switch to corresponding grouping - topologyBuilder.setBolt("test-bolt", new TestBolt(), 1).shuffleGrouping("test-spout"); - - Config conf = new Config(); - conf.setTeamEmail("streaming-comp...@twitter.com"); - conf.setTeamName("stream-computing"); - conf.setTopologyProjectName("heron-integration-test"); - conf.setNumStmgrs(1); - conf.setMaxSpoutPending(100); - if (ackEnabled) { - conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATLEAST_ONCE); - } else { - conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATMOST_ONCE); - } - if (messageTimeout != -1) { - conf.setMessageTimeoutSecs(messageTimeout); - conf.put("topology.enable.message.timeouts", "true"); - } - - TopologyAPI.Topology fTopology = - topologyBuilder.createTopology(). - setName("topology-name"). - setConfig(conf). - setState(topologyState). - getTopology(); - - pPlan.setTopology(fTopology); - } - - private static void setInstances(PhysicalPlans.PhysicalPlan.Builder pPlan) { - // Construct the spoutInstance - PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder(); - spoutInstanceInfo.setComponentName("test-spout"); - spoutInstanceInfo.setTaskId(0); - spoutInstanceInfo.setComponentIndex(0); - - PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder(); - spoutInstance.setInstanceId("spout-id"); - spoutInstance.setStmgrId("stream-manager-id"); - spoutInstance.setInfo(spoutInstanceInfo); - - // Construct the boltInstanceInfo - PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder(); - boltInstanceInfo.setComponentName("test-bolt"); - boltInstanceInfo.setTaskId(1); - boltInstanceInfo.setComponentIndex(0); - - PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder(); - boltInstance.setInstanceId("bolt-id"); - boltInstance.setStmgrId("stream-manager-id"); - boltInstance.setInfo(boltInstanceInfo); - - pPlan.addInstances(spoutInstance); - pPlan.addInstances(boltInstance); - } - - private static void setStMgr(PhysicalPlans.PhysicalPlan.Builder pPlan) { - PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder(); - stmgr.setId("stream-manager-id"); - stmgr.setHostName("127.0.0.1"); - stmgr.setDataPort(8888); - stmgr.setLocalEndpoint("endpoint"); - pPlan.addStmgrs(stmgr); - } - @SuppressWarnings("unchecked") public static void clearSingletonRegistry() throws IllegalAccessException, NoSuchFieldException { // Remove the Singleton by Reflection @@ -196,4 +144,60 @@ public final class UnitTestHelper { return registerInstanceResponse.build(); } + public static Message buildPersistStateMessage(String checkpointId) { + CheckpointManager.InitiateStatefulCheckpoint.Builder builder = CheckpointManager + .InitiateStatefulCheckpoint + .newBuilder(); + + builder.setCheckpointId(checkpointId); + + return builder.build(); + } + + public static InstanceControlMsg buildRestoreInstanceState(String checkpointId) { + return InstanceControlMsg.newBuilder() + .setRestoreInstanceStateRequest( + CheckpointManager.RestoreInstanceStateRequest + .newBuilder() + .setState(CheckpointManager.InstanceStateCheckpoint + .newBuilder() + .setCheckpointId(checkpointId)) + .build() + ) + .build(); + } + + public static InstanceControlMsg buildStartInstanceProcessingMessage(String checkpointId) { + return InstanceControlMsg.newBuilder() + .setStartInstanceStatefulProcessing( + CheckpointManager.StartInstanceStatefulProcessing + .newBuilder() + .setCheckpointId(checkpointId) + .build() + ) + .build(); + } + + public static InstanceControlMsg buildCheckpointSavedMessage( + String checkpointId, + String packingPlanId + ) { + CheckpointManager.StatefulConsistentCheckpointSaved.Builder builder = CheckpointManager + .StatefulConsistentCheckpointSaved + .newBuilder(); + + CheckpointManager.StatefulConsistentCheckpoint.Builder ckptBuilder = CheckpointManager + .StatefulConsistentCheckpoint + .newBuilder(); + + ckptBuilder.setCheckpointId(checkpointId); + ckptBuilder.setPackingPlanId(packingPlanId); + + builder.setConsistentCheckpoint(ckptBuilder.build()); + + return InstanceControlMsg.newBuilder() + .setStatefulCheckpointSaved(builder.build()) + .build(); + } + } diff --git a/heron/proto/ckptmgr.proto b/heron/proto/ckptmgr.proto index ba6633a..ea6162c 100644 --- a/heron/proto/ckptmgr.proto +++ b/heron/proto/ckptmgr.proto @@ -108,6 +108,12 @@ message StartStatefulCheckpoint { required string checkpoint_id = 1; } +// Message broadcasted to stmgr (will then be forwarded to instances) after when checkpoint becomes +// "globally consistent" +message StatefulConsistentCheckpointSaved { + required StatefulConsistentCheckpoint consistent_checkpoint = 1; +} + // Message sent by tmaster to stmgr asking them to reset their instances // to this checkpoint message RestoreTopologyStateRequest { diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp index 6c0f973..9a752de 100644 --- a/heron/stmgr/src/cpp/manager/instance-server.cpp +++ b/heron/stmgr/src/cpp/manager/instance-server.cpp @@ -498,6 +498,15 @@ void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& } } +void InstanceServer::BroadcastStatefulCheckpointSaved( + const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) { + for (auto & iter : active_instances_) { + LOG(INFO) << "Sending checkpoint: " << _msg.consistent_checkpoint().checkpoint_id() + << " saved message to instance with task_id: " << iter.second; + SendMessage(iter.first, _msg); + } +} + void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan, const std::string& _component, Connection* _conn) const { diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h index df4b5e9..b4a6038 100644 --- a/heron/stmgr/src/cpp/manager/instance-server.h +++ b/heron/stmgr/src/cpp/manager/instance-server.h @@ -72,6 +72,9 @@ class InstanceServer : public Server { void BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan); + void BroadcastStatefulCheckpointSaved( + const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg); + virtual bool HaveAllInstancesConnectedToUs() const { return active_instances_.size() == expected_instances_.size(); } diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp index 6510271..aa6625e 100644 --- a/heron/stmgr/src/cpp/manager/stmgr.cpp +++ b/heron/stmgr/src/cpp/manager/stmgr.cpp @@ -368,12 +368,18 @@ void StMgr::CreateTMasterClient(shared_ptr<proto::tmaster::TMasterLocation> tmas this->StartStatefulProcessing(checkpoint_id); }; + auto broadcast_checkpoint_saved = + [this](const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) { + this->BroadcastCheckpointSaved(_msg); + }; + tmaster_client_ = make_shared<TMasterClient>(eventLoop_, master_options, stmgr_id_, stmgr_host_, data_port_, local_data_port_, shell_port_, std::move(pplan_watch), std::move(stateful_checkpoint_watch), std::move(restore_topology_watch), - std::move(start_stateful_watch)); + std::move(start_stateful_watch), + std::move(broadcast_checkpoint_saved)); } void StMgr::CreateTupleCache() { @@ -1080,6 +1086,12 @@ void StMgr::RestoreTopologyState(sp_string _checkpoint_id, sp_int64 _restore_txi stateful_restorer_->StartRestore(_checkpoint_id, _restore_txid, local_taskids, *pplan_); } +// broadcast the news that the checkpoint has been saved to all instances connected to this stmgr +void StMgr::BroadcastCheckpointSaved( + const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) { + instance_server_->BroadcastStatefulCheckpointSaved(_msg); +} + // Called by TmasterClient when it receives directive from tmaster // to start processing after having previously recovered the state at _checkpoint_id void StMgr::StartStatefulProcessing(sp_string _checkpoint_id) { diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h index decdc39..582ac3a 100644 --- a/heron/stmgr/src/cpp/manager/stmgr.h +++ b/heron/stmgr/src/cpp/manager/stmgr.h @@ -197,6 +197,10 @@ class StMgr { void HandleStatefulRestoreDone(proto::system::StatusCode _status, std::string _checkpoint_id, sp_int64 _restore_txid); + // Called when stmgr received StatefulConsistentCheckpointSaved message from TMaster + // Then, the stmgr will forward this fact to all heron instances connected to it + void BroadcastCheckpointSaved(const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg); + // Patch new physical plan with internal hydrated topology but keep new topology data: // - new topology state // - new topology/component config diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp b/heron/stmgr/src/cpp/manager/tmaster-client.cpp index f1e27d8..657077d 100644 --- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp +++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp @@ -41,7 +41,9 @@ TMasterClient::TMasterClient(shared_ptr<EventLoop> eventLoop, const NetworkOptio VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch, VCallback<sp_string> _stateful_checkpoint_watch, VCallback<sp_string, sp_int64> _restore_topology_watch, - VCallback<sp_string> _start_stateful_watch) + VCallback<sp_string> _start_stateful_watch, + VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&> + _broadcast_checkpoint_saved) : Client(eventLoop, _options), stmgr_id_(_stmgr_id), stmgr_host_(_stmgr_host), @@ -53,6 +55,7 @@ TMasterClient::TMasterClient(shared_ptr<EventLoop> eventLoop, const NetworkOptio stateful_checkpoint_watch_(std::move(_stateful_checkpoint_watch)), restore_topology_watch_(std::move(_restore_topology_watch)), start_stateful_watch_(std::move(_start_stateful_watch)), + broadcast_checkpoint_saved_(_broadcast_checkpoint_saved), reconnect_timer_id(0), heartbeat_timer_id(0), reconnect_attempts_(0) { @@ -74,6 +77,7 @@ TMasterClient::TMasterClient(shared_ptr<EventLoop> eventLoop, const NetworkOptio InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointMessage); InstallMessageHandler(&TMasterClient::HandleRestoreTopologyStateRequest); InstallMessageHandler(&TMasterClient::HandleStartStmgrStatefulProcessing); + InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointSavedMessage); } TMasterClient::~TMasterClient() { @@ -303,6 +307,11 @@ void TMasterClient::HandleStartStmgrStatefulProcessing( start_stateful_watch_(_message->checkpoint_id()); } +void TMasterClient::HandleStatefulCheckpointSavedMessage( + pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> _msg) { + broadcast_checkpoint_saved_(*_msg); +} + void TMasterClient::SendResetTopologyState(const std::string& _dead_stmgr, int32_t _dead_task, const std::string& _reason) { diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.h b/heron/stmgr/src/cpp/manager/tmaster-client.h index ec83406..2457d72 100644 --- a/heron/stmgr/src/cpp/manager/tmaster-client.h +++ b/heron/stmgr/src/cpp/manager/tmaster-client.h @@ -42,7 +42,9 @@ class TMasterClient : public Client { VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch, VCallback<sp_string> _stateful_checkpoint_watch, VCallback<sp_string, sp_int64> _restore_topology_watch, - VCallback<sp_string> _start_stateful_watch); + VCallback<sp_string> _start_stateful_watch, + VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&> + _broadcast_checkpoint_saved); virtual ~TMasterClient(); // Told by the upper layer to disconnect and self destruct @@ -88,6 +90,9 @@ class TMasterClient : public Client { void HandleStartStmgrStatefulProcessing( pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg); + void HandleStatefulCheckpointSavedMessage( + pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> _msg); + void OnReConnectTimer(); void OnHeartbeatTimer(); void SendRegisterRequest(); @@ -116,6 +121,9 @@ class TMasterClient : public Client { // We invoke this callback upon receiving a StartStatefulProcessing message from tmaster // passing in the checkpoint id VCallback<sp_string> start_stateful_watch_; + // This callback will be invoked upon receiving a StatefulConsistentCheckpointSaved message. + // We will then forward this message to all the instances connected to this stmgr + VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&> broadcast_checkpoint_saved_; // Configs to be read sp_int32 reconnect_tmaster_interval_sec_; diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.cpp b/heron/tmaster/src/cpp/manager/stateful-controller.cpp index 31dcf7b..a348c10 100644 --- a/heron/tmaster/src/cpp/manager/stateful-controller.cpp +++ b/heron/tmaster/src/cpp/manager/stateful-controller.cpp @@ -55,14 +55,18 @@ StatefulController::StatefulController(const std::string& _topology_name, shared_ptr<heron::common::HeronStateMgr> _state_mgr, std::chrono::high_resolution_clock::time_point _tmaster_start_time, shared_ptr<common::MetricsMgrSt> _metrics_manager_client, - std::function<void(std::string)> _ckpt_save_watcher) - : topology_name_(_topology_name), ckpt_record_(std::move(_ckpt)), state_mgr_(_state_mgr), - metrics_manager_client_(_metrics_manager_client) { + std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)> + _ckpt_save_watcher) + : topology_name_(_topology_name), + ckpt_record_(std::move(_ckpt)), + state_mgr_(_state_mgr), + metrics_manager_client_(_metrics_manager_client), + ckpt_save_watcher_(_ckpt_save_watcher) { checkpointer_ = make_unique<StatefulCheckpointer>(_tmaster_start_time); restorer_ = make_unique<StatefulRestorer>(); count_metrics_ = make_shared<common::MultiCountMetric>(); + metrics_manager_client_->register_metric("__stateful_controller", count_metrics_); - ckpt_save_watcher_ = _ckpt_save_watcher; } StatefulController::~StatefulController() { @@ -162,13 +166,13 @@ void StatefulController::HandleCheckpointSave( shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt, proto::system::StatusCode _status) { if (_status == proto::system::OK) { - LOG(INFO) << "Successfully saved " << _new_ckpt->consistent_checkpoints(0).checkpoint_id() - << " as the new globally consistent checkpoint"; ckpt_record_ = std::move(_new_ckpt); - std::string oldest_ckpt = - ckpt_record_->consistent_checkpoints(ckpt_record_->consistent_checkpoints_size() - 1) - .checkpoint_id(); - ckpt_save_watcher_(oldest_ckpt); + + LOG(INFO) << "Successfully saved " + << ckpt_record_->consistent_checkpoints(0).checkpoint_id() + << " as the latest globally consistent checkpoint"; + + ckpt_save_watcher_(*ckpt_record_); } else { LOG(ERROR) << "Error saving " << _new_ckpt->consistent_checkpoints(0).checkpoint_id() << " as the new globally consistent checkpoint " diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.h b/heron/tmaster/src/cpp/manager/stateful-controller.h index 6ceda37..4e3b618 100644 --- a/heron/tmaster/src/cpp/manager/stateful-controller.h +++ b/heron/tmaster/src/cpp/manager/stateful-controller.h @@ -58,7 +58,8 @@ class StatefulController { shared_ptr<heron::common::HeronStateMgr> _state_mgr, std::chrono::high_resolution_clock::time_point _tmaster_start_time, shared_ptr<common::MetricsMgrSt> _metrics_manager_client, - std::function<void(std::string)> _ckpt_save_watcher); + std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)> + _ckpt_save_watcher); virtual ~StatefulController(); // Start a new restore process void StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_checkpoints); @@ -102,7 +103,7 @@ class StatefulController { unique_ptr<StatefulRestorer> restorer_; shared_ptr<common::MetricsMgrSt> metrics_manager_client_; shared_ptr<common::MultiCountMetric> count_metrics_; - std::function<void(std::string)> ckpt_save_watcher_; + std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)> ckpt_save_watcher_; }; } // namespace tmaster } // namespace heron diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.cpp b/heron/tmaster/src/cpp/manager/stmgrstate.cpp index 46be97d..b144244 100644 --- a/heron/tmaster/src/cpp/manager/stmgrstate.cpp +++ b/heron/tmaster/src/cpp/manager/stmgrstate.cpp @@ -107,10 +107,17 @@ void StMgrState::NewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) { } void StMgrState::NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request) { - LOG(INFO) << "Sending a new stateful checkpoint request to stmgr" << stmgr_->id(); + LOG(INFO) << "Sending a new stateful checkpoint request to stmgr: " << stmgr_->id(); server_.SendMessage(connection_, _request); } +void StMgrState::SendCheckpointSavedMessage( + const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg) { + LOG(INFO) << "Sending checkpoint saved message to stmgr: " << stmgr_->id() << " " + << "for checkpoint: " << _msg.consistent_checkpoint().checkpoint_id(); + server_.SendMessage(connection_, _msg); +} + /* void StMgrState::AddAssignment(const std::vector<pair<string, sp_int32> >& _assignments, diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.h b/heron/tmaster/src/cpp/manager/stmgrstate.h index 270d93a..d9d9159 100644 --- a/heron/tmaster/src/cpp/manager/stmgrstate.h +++ b/heron/tmaster/src/cpp/manager/stmgrstate.h @@ -70,6 +70,8 @@ class StMgrState { // Send stateful checkpoint message to the stmgr void NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request); + void SendCheckpointSavedMessage(const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg); + bool TimedOut() const; diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp index 304634f..36b7845 100644 --- a/heron/tmaster/src/cpp/manager/tmaster.cpp +++ b/heron/tmaster/src/cpp/manager/tmaster.cpp @@ -398,8 +398,8 @@ void TMaster::SetupStatefulController( config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_, 300); CHECK_GT(stateful_checkpoint_interval, 0); - auto cb = [this](std::string _oldest_ckptid) { - this->HandleStatefulCheckpointSave(_oldest_ckptid); + auto cb = [this](const proto::ckptmgr::StatefulConsistentCheckpoints& new_ckpts) { + this->HandleStatefulCheckpointSave(new_ckpts); }; // Instantiate the stateful restorer/coordinator stateful_controller_ = make_unique<StatefulController>(topology_->name(), _ckpt, state_mgr_, @@ -616,8 +616,22 @@ void TMaster::CleanAllStatefulCheckpoint() { ckptmgr_client_->SendCleanStatefulCheckpointRequest("", true); } -void TMaster::HandleStatefulCheckpointSave(const std::string& _oldest_ckpt) { - ckptmgr_client_->SendCleanStatefulCheckpointRequest(_oldest_ckpt, false); +void TMaster::HandleStatefulCheckpointSave( + const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts) { + // broadcast globally consistent checkpoint completion + proto::ckptmgr::StatefulConsistentCheckpointSaved msg; + msg.mutable_consistent_checkpoint()->CopyFrom(new_ckpts.consistent_checkpoints(0)); + + for (auto & stmgr : stmgrs_) { + stmgr.second->SendCheckpointSavedMessage(msg); + } + + // clean oldest checkpoint on save + std::string oldest_ckpt_id = + new_ckpts.consistent_checkpoints(new_ckpts.consistent_checkpoints_size() - 1) + .checkpoint_id(); + + ckptmgr_client_->SendCleanStatefulCheckpointRequest(oldest_ckpt_id, false); } // Called when ckptmgr completes the clean stateful checkpoint request diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h index d37fbaa..3bdd25e 100644 --- a/heron/tmaster/src/cpp/manager/tmaster.h +++ b/heron/tmaster/src/cpp/manager/tmaster.h @@ -195,7 +195,8 @@ class TMaster { const ComponentConfigMap& _config); // Function called when a new stateful ckpt record is saved - void HandleStatefulCheckpointSave(const std::string& _oldest_ckpt); + void HandleStatefulCheckpointSave( + const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts); // Function called to kill container void KillContainer(const std::string& host_name,