This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new ea96a34  [STREAMCOMP-2885] broadcast GCC completition (#3330)
ea96a34 is described below

commit ea96a34585e4e02e014277a06b4ba7b4b37a9317
Author: Lawrence Pan <lawrencefei...@gmail.com>
AuthorDate: Fri Sep 27 12:50:44 2019 -0400

    [STREAMCOMP-2885] broadcast GCC completition (#3330)
    
    * amend gitignore
    
    * decouple GCC post-save logic from stateful controller
    
    * tmaster broadcast checkpoint completion
    
    the message will be first sent to all stmgrs, and then each stmgr will 
forward
    it to every heron instance it connects to.
    
    * heron instance handles global checkpoint saved msg
    
    * expose necessary API for the 2PC support
    
    unit tests for this will be in the following commit
    
    * test: refactor mock physical plan to a builder
    
    * test preSave and postSave hook
    
    * [STREAMCOMP-2916] block execute on postSave
    
    * add stateful tests for spouts
    
    * test preRestore
    
    * rename interface to ITwoPhaseStatefulComponent
    
    * block spout from reading data tuples as well
    
    * fix log format
    
    * do not check interface
    
    this is because the boolean will only be set to true for tasks that 
implement ITwoPhaseStatefulComponent. No need to do the check again.
---
 .gitignore                                         |   5 +
 .../api/topology/ITwoPhaseStatefulComponent.java   |  67 ++++++
 .../java/org/apache/heron/instance/IInstance.java  |  15 ++
 .../apache/heron/instance/InstanceControlMsg.java  |  13 +
 .../src/java/org/apache/heron/instance/Slave.java  |  24 +-
 .../apache/heron/instance/bolt/BoltInstance.java   |  28 ++-
 .../apache/heron/instance/spout/SpoutInstance.java |  38 ++-
 .../apache/heron/network/StreamManagerClient.java  |  14 ++
 heron/instance/tests/java/BUILD                    |   2 +
 .../instance/bolt/BoltStatefulInstanceTest.java    | 261 +++++++++++++++++++++
 .../instance/spout/SpoutStatefulInstanceTest.java  | 243 +++++++++++++++++++
 .../java/org/apache/heron/resource/Constants.java  |   6 +
 .../heron/resource/MockPhysicalPlansBuilder.java   | 155 ++++++++++++
 .../apache/heron/resource/TestStatefulBolt.java    |  69 ++++++
 .../apache/heron/resource/TestStatefulSpout.java   |  76 ++++++
 .../heron/resource/TestTwoPhaseStatefulBolt.java   |  98 ++++++++
 .../heron/resource/TestTwoPhaseStatefulSpout.java  |  97 ++++++++
 .../org/apache/heron/resource/UnitTestHelper.java  | 164 ++++++-------
 heron/proto/ckptmgr.proto                          |   6 +
 heron/stmgr/src/cpp/manager/instance-server.cpp    |   9 +
 heron/stmgr/src/cpp/manager/instance-server.h      |   3 +
 heron/stmgr/src/cpp/manager/stmgr.cpp              |  14 +-
 heron/stmgr/src/cpp/manager/stmgr.h                |   4 +
 heron/stmgr/src/cpp/manager/tmaster-client.cpp     |  11 +-
 heron/stmgr/src/cpp/manager/tmaster-client.h       |  10 +-
 .../src/cpp/manager/stateful-controller.cpp        |  24 +-
 .../tmaster/src/cpp/manager/stateful-controller.h  |   5 +-
 heron/tmaster/src/cpp/manager/stmgrstate.cpp       |   9 +-
 heron/tmaster/src/cpp/manager/stmgrstate.h         |   2 +
 heron/tmaster/src/cpp/manager/tmaster.cpp          |  22 +-
 heron/tmaster/src/cpp/manager/tmaster.h            |   3 +-
 31 files changed, 1387 insertions(+), 110 deletions(-)

diff --git a/.gitignore b/.gitignore
index 3b81340..c599612 100644
--- a/.gitignore
+++ b/.gitignore
@@ -90,6 +90,10 @@ tools/jdk/ijar
 .idea/sqlDataSources.xml
 .idea/dynamic.xml
 
+# Intellij bazel plugin
+.ijwb
+.clwb
+
 ## Maven generated files
 .classpath.txt
 
@@ -134,6 +138,7 @@ website2/website/static/api
 
 # Visual Studio Code
 .vscode
+vs.code-workspace
 
 # integration_test
 results/
diff --git 
a/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java
 
b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java
new file mode 100644
index 0000000..6dc88c3
--- /dev/null
+++ 
b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.topology;
+
+import java.io.Serializable;
+
+/**
+ * Defines a stateful component that is aware of Heron topology's "two-phase 
commit".
+ *
+ * Note tasks saving a distributed checkpoint would be the "prepare" phase of 
the two-phase commit
+ * algorithm. When a distributed checkpoint is done, we can say that all tasks 
agree that they
+ * will not roll back to the time before that distributed checkpoint, and the 
"prepare" phase is
+ * complete.
+ *
+ * When the "prepare" phase is complete, Heron will invoke the "postSave" hook 
to signal the
+ * beginning of the "commit" phase. If there is a failure occurred during the 
"prepare" phase,
+ * Heron will invoke the hook "preRestore" to signal that two-phase commit is 
aborted, and the
+ * topology will be rolled back to the previous checkpoint.
+ *
+ * Note that the commit phase will finish after the postSave hook exits 
successfully. Then, the
+ * prepare phase of the following checkpoint will begin.
+ *
+ * In addition, for two-phase stateful components specifically, Heron will not 
execute (for bolts)
+ * or produce (for spouts) tuples between preSave and postSave. This will 
guarantee that the prepare
+ * phase of the next checkpoint will not overlap with the commit phase of the 
current checkpoint
+ * (eg. we block execution of tuples from the next checkpoint unless commit 
phase is done).
+ *
+ * See the end-to-end effectively-once designed doc (linked in the PR of this 
commit) for more
+ * details.
+ */
+public interface ITwoPhaseStatefulComponent<K extends Serializable, V extends 
Serializable>
+    extends IStatefulComponent<K, V> {
+
+  /**
+   * This is a hook for the component to perform some actions after a 
checkpoint is persisted
+   * successfully for all components in the topology.
+   *
+   * @param checkpointId the ID of the checkpoint
+   */
+  void postSave(String checkpointId);
+
+  /**
+   * This is a hook for the component to perform some actions (eg. state 
clean-up) before the
+   * framework attempts to delete the component and restore it to a 
previously-saved checkpoint.
+   *
+   * @param checkpointId the ID of the checkpoint that the component is being 
restored to
+   */
+  void preRestore(String checkpointId);
+
+}
diff --git a/heron/instance/src/java/org/apache/heron/instance/IInstance.java 
b/heron/instance/src/java/org/apache/heron/instance/IInstance.java
index de7fbd0..9671a1a 100644
--- a/heron/instance/src/java/org/apache/heron/instance/IInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/IInstance.java
@@ -58,6 +58,21 @@ public interface IInstance {
   void clean();
 
   /**
+   * Inform the Instance that the framework will clean, stop, and delete the 
instance
+   * in order to restore its state to a previously-saved checkpoint.
+   *
+   * @param checkpointId the ID of the checkpoint the instance will be 
restoring to
+   */
+  void preRestore(String checkpointId);
+
+  /**
+   * Inform the Instance that a particular checkpoint has become globally 
consistent
+   *
+   * @param checkpointId the ID of the checkpoint that became globally 
consistent
+   */
+  void onCheckpointSaved(String checkpointId);
+
+  /**
    * Destroy the whole IInstance.
    * Notice: It should only be called when the whole program is
    * exiting. And in fact, this method should never be called.
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java 
b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
index 789f53b..c58c384 100644
--- a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
+++ b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
@@ -26,11 +26,13 @@ public final class InstanceControlMsg {
   private PhysicalPlanHelper newPhysicalPlanHelper;
   private CheckpointManager.RestoreInstanceStateRequest 
restoreInstanceStateRequest;
   private CheckpointManager.StartInstanceStatefulProcessing 
startInstanceStatefulProcessing;
+  private CheckpointManager.StatefulConsistentCheckpointSaved 
statefulConsistentCheckpointSaved;
 
   private InstanceControlMsg(Builder builder) {
     this.newPhysicalPlanHelper = builder.newPhysicalPlanHelper;
     this.restoreInstanceStateRequest = builder.restoreInstanceStateRequest;
     this.startInstanceStatefulProcessing = 
builder.startInstanceStatefulProcessing;
+    this.statefulConsistentCheckpointSaved = 
builder.statefulConsistentCheckpointSaved;
   }
 
   public static Builder newBuilder() {
@@ -41,6 +43,10 @@ public final class InstanceControlMsg {
     return newPhysicalPlanHelper;
   }
 
+  public CheckpointManager.StatefulConsistentCheckpointSaved 
getStatefulCheckpointSavedMessage() {
+    return this.statefulConsistentCheckpointSaved;
+  }
+
   public boolean isNewPhysicalPlanHelper() {
     return newPhysicalPlanHelper != null;
   }
@@ -65,6 +71,7 @@ public final class InstanceControlMsg {
     private PhysicalPlanHelper newPhysicalPlanHelper;
     private CheckpointManager.RestoreInstanceStateRequest 
restoreInstanceStateRequest;
     private CheckpointManager.StartInstanceStatefulProcessing 
startInstanceStatefulProcessing;
+    private CheckpointManager.StatefulConsistentCheckpointSaved 
statefulConsistentCheckpointSaved;
 
     private Builder() {
 
@@ -87,6 +94,12 @@ public final class InstanceControlMsg {
       return this;
     }
 
+    public Builder setStatefulCheckpointSaved(
+        CheckpointManager.StatefulConsistentCheckpointSaved message) {
+      this.statefulConsistentCheckpointSaved = message;
+      return this;
+    }
+
     public InstanceControlMsg build() {
       return new InstanceControlMsg(this);
     }
diff --git a/heron/instance/src/java/org/apache/heron/instance/Slave.java 
b/heron/instance/src/java/org/apache/heron/instance/Slave.java
index 5b85fc6..4e57772 100644
--- a/heron/instance/src/java/org/apache/heron/instance/Slave.java
+++ b/heron/instance/src/java/org/apache/heron/instance/Slave.java
@@ -123,6 +123,16 @@ public class Slave implements Runnable, AutoCloseable {
           if (instanceControlMsg.isNewPhysicalPlanHelper()) {
             handleNewPhysicalPlan(instanceControlMsg);
           }
+
+          // When a checkpoint becomes "globally consistent"
+          if (instanceControlMsg.getStatefulCheckpointSavedMessage() != null) {
+            String checkpointId = instanceControlMsg
+                .getStatefulCheckpointSavedMessage()
+                .getConsistentCheckpoint()
+                .getCheckpointId();
+
+            handleGlobalCheckpointConsistent(checkpointId);
+          }
         }
       }
     };
@@ -130,6 +140,11 @@ public class Slave implements Runnable, AutoCloseable {
     slaveLooper.addTasksOnWakeup(handleControlMessageTask);
   }
 
+  private void handleGlobalCheckpointConsistent(String checkpointId) {
+    LOG.log(Level.INFO, "checkpoint: {0} has become globally consistent", 
checkpointId);
+    instance.onCheckpointSaved(checkpointId);
+  }
+
   private void resetCurrentAssignment() {
     helper.setTopologyContext(metricsCollector);
     instance = helper.getMySpout() != null
@@ -262,7 +277,7 @@ public class Slave implements Runnable, AutoCloseable {
     startInstanceIfNeeded();
   }
 
-  private void cleanAndStopSlave() {
+  private void cleanAndStopSlaveBeforeRestore(String checkpointId) {
     // Clear all queues
     streamInCommunicator.clear();
     streamOutCommunicator.clear();
@@ -275,6 +290,7 @@ public class Slave implements Runnable, AutoCloseable {
     slaveLooper.clearTimers();
 
     if (instance != null) {
+      instance.preRestore(checkpointId);
       instance.clean();
     }
 
@@ -294,9 +310,13 @@ public class Slave implements Runnable, AutoCloseable {
   private void handleRestoreInstanceStateRequest(InstanceControlMsg 
instanceControlMsg) {
     CheckpointManager.RestoreInstanceStateRequest request =
         instanceControlMsg.getRestoreInstanceStateRequest();
+
+    // ID of the checkpoint we are restoring to
+    String checkpointId = request.getState().getCheckpointId();
+
     // Clean buffers and unregister tasks in slave looper
     if (isInstanceStarted) {
-      cleanAndStopSlave();
+      cleanAndStopSlaveBeforeRestore(checkpointId);
     }
 
     // Restore the state
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java 
b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index 9cc177a..a53ed3d 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -37,6 +37,7 @@ import org.apache.heron.api.metric.GlobalMetrics;
 import org.apache.heron.api.serializer.IPluggableSerializer;
 import org.apache.heron.api.state.State;
 import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
 import org.apache.heron.api.topology.IUpdatable;
 import org.apache.heron.api.utils.Utils;
 import org.apache.heron.common.basics.Communicator;
@@ -74,6 +75,9 @@ public class BoltInstance implements IInstance {
 
   private State<Serializable, Serializable> instanceState;
 
+  // default to false, can only be toggled to true if bolt implements 
ITwoPhaseStatefulComponent
+  private boolean waitingForCheckpointSaved;
+
   // The reference to topology's config
   private final Map<String, Object> config;
 
@@ -110,6 +114,8 @@ public class BoltInstance implements IInstance {
             
String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
             helper.getMyInstanceId());
 
+    this.waitingForCheckpointSaved = false;
+
     if (helper.getMyBolt() == null) {
       throw new RuntimeException("HeronBoltInstance has no bolt in physical 
plan.");
     }
@@ -162,10 +168,12 @@ public class BoltInstance implements IInstance {
     // so that topology emit, ack, fail are thread safe
     collector.lock.lock();
     try {
-      // Checkpoint
       if (bolt instanceof IStatefulComponent) {
         ((IStatefulComponent) bolt).preSave(checkpointId);
       }
+      if (bolt instanceof ITwoPhaseStatefulComponent) {
+        waitingForCheckpointSaved = true;
+      }
       collector.sendOutState(instanceState, checkpointId, spillState, 
spillStateLocation);
     } finally {
       collector.lock.unlock();
@@ -214,6 +222,21 @@ public class BoltInstance implements IInstance {
   }
 
   @Override
+  public void preRestore(String checkpointId) {
+    if (bolt instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) bolt).preRestore(checkpointId);
+    }
+  }
+
+  @Override
+  public void onCheckpointSaved(String checkpointId) {
+    if (bolt instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) bolt).postSave(checkpointId);
+      waitingForCheckpointSaved = false;
+    }
+  }
+
+  @Override
   public void clean() {
     // Invoke clean up hook before clean() is called
     helper.getTopologyContext().invokeHookCleanup();
@@ -238,6 +261,7 @@ public class BoltInstance implements IInstance {
       @Override
       public void run() {
         boltMetrics.updateTaskRunCount();
+
         // Back-pressure -- only when we could send out tuples will we read & 
execute tuples
         if (collector.isOutQueuesAvailable()) {
           boltMetrics.updateExecutionCount();
@@ -271,7 +295,7 @@ public class BoltInstance implements IInstance {
 
     long startOfCycle = System.nanoTime();
     // Read data from in Queues
-    while (!inQueue.isEmpty()) {
+    while (!inQueue.isEmpty() && !waitingForCheckpointSaved) {
       Message msg = inQueue.poll();
 
       if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java 
b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
index 53675c4..e9a0e13 100644
--- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
@@ -36,6 +36,7 @@ import org.apache.heron.api.spout.ISpout;
 import org.apache.heron.api.spout.SpoutOutputCollector;
 import org.apache.heron.api.state.State;
 import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
 import org.apache.heron.api.topology.IUpdatable;
 import org.apache.heron.api.utils.Utils;
 import org.apache.heron.common.basics.ByteAmount;
@@ -71,6 +72,9 @@ public class SpoutInstance implements IInstance {
   private final boolean spillState;
   private final String spillStateLocation;
 
+  // default to false, can only be toggled to true if spout implements 
ITwoPhaseStatefulComponent
+  private boolean waitingForCheckpointSaved;
+
   private State<Serializable, Serializable> instanceState;
 
   private final SlaveLooper looper;
@@ -110,6 +114,8 @@ public class SpoutInstance implements IInstance {
         
String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
         helper.getMyInstanceId());
 
+    this.waitingForCheckpointSaved = false;
+
     LOG.info("Is this topology stateful: " + isTopologyStateful);
 
     if (helper.getMySpout() == null) {
@@ -171,10 +177,15 @@ public class SpoutInstance implements IInstance {
       if (spout instanceof IStatefulComponent) {
         ((IStatefulComponent) spout).preSave(checkpointId);
       }
+
+      if (spout instanceof ITwoPhaseStatefulComponent) {
+        waitingForCheckpointSaved = true;
+      }
       collector.sendOutState(instanceState, checkpointId, spillState, 
spillStateLocation);
     } finally {
       collector.lock.unlock();
     }
+
     LOG.info("State persisted for checkpoint: " + checkpointId);
   }
 
@@ -219,6 +230,21 @@ public class SpoutInstance implements IInstance {
   }
 
   @Override
+  public void preRestore(String checkpointId) {
+    if (spout instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) spout).preRestore(checkpointId);
+    }
+  }
+
+  @Override
+  public void onCheckpointSaved(String checkpointId) {
+    if (spout instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) spout).postSave(checkpointId);
+      waitingForCheckpointSaved = false;
+    }
+  }
+
+  @Override
   public void clean() {
     // Invoke clean up hook before clean() is called
     helper.getTopologyContext().invokeHookCleanup();
@@ -257,6 +283,9 @@ public class SpoutInstance implements IInstance {
       public void run() {
         spoutMetrics.updateTaskRunCount();
 
+        // Check if we have any message to process anyway
+        readTuplesAndExecute(streamInQueue);
+
         // Check whether we should produce more tuples
         if (isProduceTuple()) {
           spoutMetrics.updateProduceTupleCount();
@@ -272,9 +301,6 @@ public class SpoutInstance implements IInstance {
           spoutMetrics.updateOutQueueFullCount();
         }
 
-        // Check if we have any message to process anyway
-        readTuplesAndExecute(streamInQueue);
-
         if (ackEnabled) {
           // Update the pending-to-be-acked tuples counts
           spoutMetrics.updatePendingTuplesCount(collector.numInFlight());
@@ -330,12 +356,14 @@ public class SpoutInstance implements IInstance {
    * It is allowed in:
    * 1. Outgoing Stream queue is available
    * 2. Topology State is RUNNING
+   * 3. If the Spout implements ITwoPhaseStatefulComponent, not waiting for 
checkpoint saved message
    *
    * @return true to allow produceTuple() to be invoked
    */
   private boolean isProduceTuple() {
     return collector.isOutQueuesAvailable()
-        && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING);
+        && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING)
+        && !waitingForCheckpointSaved;
   }
 
   protected void produceTuple() {
@@ -435,7 +463,7 @@ public class SpoutInstance implements IInstance {
     long startOfCycle = System.nanoTime();
     Duration spoutAckBatchTime = systemConfig.getInstanceAckBatchTime();
 
-    while (!inQueue.isEmpty()) {
+    while (!inQueue.isEmpty() && !waitingForCheckpointSaved) {
       Message msg = inQueue.poll();
 
       if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
diff --git 
a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java 
b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
index 69b2493..fb9212b 100644
--- a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
+++ b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
@@ -120,6 +120,7 @@ public class StreamManagerClient extends HeronClient {
     
registerOnMessage(CheckpointManager.InitiateStatefulCheckpoint.newBuilder());
     
registerOnMessage(CheckpointManager.RestoreInstanceStateRequest.newBuilder());
     
registerOnMessage(CheckpointManager.StartInstanceStatefulProcessing.newBuilder());
+    
registerOnMessage(CheckpointManager.StatefulConsistentCheckpointSaved.newBuilder());
   }
 
 
@@ -205,6 +206,8 @@ public class StreamManagerClient extends HeronClient {
       
handleRestoreInstanceStateRequest((CheckpointManager.RestoreInstanceStateRequest)
 message);
     } else if (message instanceof 
CheckpointManager.StartInstanceStatefulProcessing) {
       
handleStartStatefulRequest((CheckpointManager.StartInstanceStatefulProcessing) 
message);
+    } else if (message instanceof 
CheckpointManager.StatefulConsistentCheckpointSaved) {
+      
handleCheckpointSaved((CheckpointManager.StatefulConsistentCheckpointSaved) 
message);
     } else {
       throw new RuntimeException("Unknown kind of message received from Stream 
Manager");
     }
@@ -286,6 +289,17 @@ public class StreamManagerClient extends HeronClient {
     inControlQueue.offer(instanceControlMsg);
   }
 
+  private void handleCheckpointSaved(
+      CheckpointManager.StatefulConsistentCheckpointSaved message) {
+    LOG.info("Received a StatefulCheckpointSaved message with checkpoint id: "
+        + message.getConsistentCheckpoint().getCheckpointId());
+
+    InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder()
+        .setStatefulCheckpointSaved(message)
+        .build();
+    inControlQueue.offer(instanceControlMsg);
+  }
+
   private void handleRestoreInstanceStateRequest(
       CheckpointManager.RestoreInstanceStateRequest request) {
     LOG.info("Received a RestoreInstanceState request with checkpoint id: "
diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD
index 0c79300..23b2186 100644
--- a/heron/instance/tests/java/BUILD
+++ b/heron/instance/tests/java/BUILD
@@ -24,8 +24,10 @@ java_tests(
         "org.apache.heron.grouping.EmitDirectBoltTest",
         "org.apache.heron.grouping.EmitDirectSpoutTest",
         "org.apache.heron.instance.bolt.BoltInstanceTest",
+        "org.apache.heron.instance.bolt.BoltStatefulInstanceTest",
         "org.apache.heron.instance.spout.ActivateDeactivateTest",
         "org.apache.heron.instance.spout.SpoutInstanceTest",
+        "org.apache.heron.instance.spout.SpoutStatefulInstanceTest",
         "org.apache.heron.metrics.GlobalMetricsTest",
         "org.apache.heron.metrics.MultiAssignableMetricTest",
         "org.apache.heron.network.ConnectTest",
diff --git 
a/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java
 
b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java
new file mode 100644
index 0000000..d5eac2c
--- /dev/null
+++ 
b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.instance.bolt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.serializer.IPluggableSerializer;
+import org.apache.heron.api.serializer.JavaSerializer;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.instance.SlaveTester;
+import org.apache.heron.proto.system.HeronTuples;
+import org.apache.heron.proto.system.PhysicalPlans;
+import org.apache.heron.resource.Constants;
+import org.apache.heron.resource.MockPhysicalPlansBuilder;
+import org.apache.heron.resource.TestSpout;
+import org.apache.heron.resource.TestStatefulBolt;
+import org.apache.heron.resource.TestTwoPhaseStatefulBolt;
+import org.apache.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test if stateful bolt is able to respond to incoming control/data tuples as 
expected.
+ */
+public class BoltStatefulInstanceTest {
+  private SlaveTester slaveTester;
+  private static IPluggableSerializer serializer = new JavaSerializer();
+
+  @Before
+  public void before() {
+    slaveTester = new SlaveTester();
+    slaveTester.start();
+  }
+
+  @After
+  public void after() throws NoSuchFieldException, IllegalAccessException {
+    slaveTester.stop();
+  }
+
+  @Test
+  public void testPreSaveAndPostSave() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, 
preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, 
postSaveLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke preSave
+    
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke postSave
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0",
 "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+  }
+
+  @Test
+  public void testPreRestore() throws InterruptedException {
+    CountDownLatch preRestoreLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, 
preRestoreLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+    assertEquals(1, preRestoreLatch.getCount());
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx"));
+
+    assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preRestoreLatch.getCount());
+  }
+
+  /**
+   * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, 
execute will not be invoked
+   * unless the corresponding postSave is called.
+   */
+  @Test
+  public void testPostSaveBlockExecute() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+
+    CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute 
one tuple
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, 
preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, 
postSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, 
executeLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // this should invoke preSave
+    
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // put a data tuple into the inStreamQueue
+    slaveTester.getInStreamQueue().offer(buildTupleSet());
+
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // Wait for a bounded amount of time, assert that the tuple will not 
execute as it is
+    // blocked on postSave. This is because we only want to allow one 
uncommitted "transaction" on
+    // each task. See the design doc for more details.
+    assertFalse(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // this should invoke postSave
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0",
 "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+    assertEquals(0, executeLatch.getCount());
+  }
+
+  /**
+   * Ensure that the aforementioned behaviour does not apply for bolts that 
don't implement
+   * ITwoPhaseStatefulComponent
+   */
+  @Test
+  public void testExecuteNotBlocked() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute 
one tuple
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, 
preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, 
executeLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulBolt());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // this should invoke preSave
+    
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // put a data tuple into the inStreamQueue
+    slaveTester.getInStreamQueue().offer(buildTupleSet());
+
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+
+    // no need to wait for postSave as the bolt doesn't implement 
ITwoPhaseStatefulComponent
+    assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, executeLatch.getCount());
+  }
+
+  // build a tuple set that contains one data tuple
+  private HeronTuples.HeronTupleSet buildTupleSet() {
+    HeronTuples.HeronTupleSet.Builder heronTupleSet = 
HeronTuples.HeronTupleSet.newBuilder();
+    heronTupleSet.setSrcTaskId(1);
+    HeronTuples.HeronDataTupleSet.Builder dataTupleSet = 
HeronTuples.HeronDataTupleSet.newBuilder();
+    TopologyAPI.StreamId.Builder streamId = TopologyAPI.StreamId.newBuilder();
+    streamId.setComponentName("test-spout");
+    streamId.setId("default");
+    dataTupleSet.setStream(streamId);
+
+    HeronTuples.HeronDataTuple.Builder dataTuple = 
HeronTuples.HeronDataTuple.newBuilder();
+    dataTuple.setKey(0);
+
+    HeronTuples.RootId.Builder rootId = HeronTuples.RootId.newBuilder();
+    rootId.setKey(0);
+    rootId.setTaskid(0);
+    dataTuple.addRoots(rootId);
+
+    dataTuple.addValues(ByteString.copyFrom(serializer.serialize("A")));
+    dataTupleSet.addTuples(dataTuple);
+    heronTupleSet.setData(dataTupleSet);
+
+    return heronTupleSet.build();
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageFor2PCBolt() {
+    return buildPhysicalPlanMessage(new TestTwoPhaseStatefulBolt());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageForStatefulBolt() {
+    return buildPhysicalPlanMessage(new TestStatefulBolt());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessage(IRichBolt bolt) {
+    PhysicalPlans.PhysicalPlan physicalPlan =
+        MockPhysicalPlansBuilder
+            .newBuilder()
+            
.withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1)
+            .withTopologyState(TopologyAPI.TopologyState.RUNNING)
+            .withSpoutInstance(
+                "test-spout",
+                0,
+                "spout-id",
+                new TestSpout()
+            )
+            .withBoltInstance(
+                "test-bolt",
+                1,
+                "bolt-id",
+                "test-spout",
+                bolt
+            )
+            .build();
+
+    PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "bolt-id");
+
+    return InstanceControlMsg.newBuilder()
+        .setNewPhysicalPlanHelper(ph)
+        .build();
+  }
+}
diff --git 
a/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java
 
b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java
new file mode 100644
index 0000000..4d65aec
--- /dev/null
+++ 
b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.instance.spout;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.serializer.IPluggableSerializer;
+import org.apache.heron.api.serializer.JavaSerializer;
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.instance.SlaveTester;
+import org.apache.heron.proto.system.PhysicalPlans;
+import org.apache.heron.resource.Constants;
+import org.apache.heron.resource.MockPhysicalPlansBuilder;
+import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestStatefulSpout;
+import org.apache.heron.resource.TestTwoPhaseStatefulSpout;
+import org.apache.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.*;
+
+public class SpoutStatefulInstanceTest {
+
+  private SlaveTester slaveTester;
+  private static IPluggableSerializer serializer = new JavaSerializer();
+
+  @Before
+  public void before() {
+    slaveTester = new SlaveTester();
+    slaveTester.start();
+  }
+
+  @After
+  public void after() throws NoSuchFieldException, IllegalAccessException {
+    slaveTester.stop();
+  }
+
+  @Test
+  public void testPreSaveAndPostSave() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, 
preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, 
postSaveLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke preSave
+    
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke postSave
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0",
 "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+  }
+
+  @Test
+  public void testPreRestore() throws InterruptedException {
+    CountDownLatch preRestoreLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, 
preRestoreLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+    assertEquals(1, preRestoreLatch.getCount());
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx"));
+
+    assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preRestoreLatch.getCount());
+  }
+
+  /**
+   * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, 
execute will not be invoked
+   * unless the corresponding postSave is called.
+   */
+  @Test
+  public void testPostSaveBlockExecute() throws Exception {
+    // when this boolean is set to false, nextTuple on the spout will be run, 
but the spout will
+    // make sure to not emit any tuples.
+    AtomicBoolean shouldStartEmit = new AtomicBoolean(false);
+    SingletonRegistry.INSTANCE.registerSingleton(
+        Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit);
+
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+    CountDownLatch emitLatch = new CountDownLatch(1);
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, 
preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, 
postSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, 
emitLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke preSave
+    
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // tell the spout to start emitting tuples
+    assertFalse(shouldStartEmit.getAndSet(true));
+
+    // since preSave is executed, spout will not emit until postSave is called
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, emitLatch.getCount());
+
+    // Wait for a bounded amount of time, assert that the spout will not emit 
tuples as it is
+    // blocked on postSave. This is because we only want to allow one 
uncommitted "transaction" on
+    // each task. See the design doc for more details.
+    assertFalse(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, emitLatch.getCount());
+
+    // this should invoke postSave
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0",
 "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+    assertEquals(0, emitLatch.getCount());
+  }
+
+  /**
+   * Ensure that the aforementioned behaviour does not apply for spouts that 
don't implement
+   * ITwoPhaseStatefulComponent
+   */
+  @Test
+  public void testExecuteNotBlocked() throws Exception {
+    // when this boolean is set to false, nextTuple on the spout will be run, 
but the spout will
+    // make sure to not emit any tuples.
+    AtomicBoolean shouldStartEmit = new AtomicBoolean(false);
+    SingletonRegistry.INSTANCE.registerSingleton(
+        Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit);
+
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch emitLatch = new CountDownLatch(1);
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, 
preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, 
emitLatch);
+
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    
slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    
slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulSpout());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, emitLatch.getCount());
+
+    // this should invoke preSave
+    
slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // tell the spout to start emitting tuples
+    assertFalse(shouldStartEmit.getAndSet(true));
+
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+
+    // no need to wait for postSave as the bolt doesn't implement 
ITwoPhaseStatefulComponent
+    assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), 
TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, emitLatch.getCount());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageFor2PCSpout() {
+    return buildPhysicalPlanMessage(new TestTwoPhaseStatefulSpout());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageForStatefulSpout() {
+    return buildPhysicalPlanMessage(new TestStatefulSpout());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessage(IRichSpout spout) {
+    PhysicalPlans.PhysicalPlan physicalPlan =
+        MockPhysicalPlansBuilder
+            .newBuilder()
+            
.withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1)
+            .withTopologyState(TopologyAPI.TopologyState.RUNNING)
+            .withSpoutInstance(
+                "test-spout",
+                0,
+                "spout-id",
+                spout
+            )
+            .withBoltInstance(
+                "test-bolt",
+                1,
+                "bolt-id",
+                "test-spout",
+                new TestBolt()
+            )
+            .build();
+
+    PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "spout-id");
+
+    return InstanceControlMsg.newBuilder()
+        .setNewPhysicalPlanHelper(ph)
+        .build();
+  }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/Constants.java 
b/heron/instance/tests/java/org/apache/heron/resource/Constants.java
index 3ffe9b0..9b7d2ce 100644
--- a/heron/instance/tests/java/org/apache/heron/resource/Constants.java
+++ b/heron/instance/tests/java/org/apache/heron/resource/Constants.java
@@ -39,12 +39,18 @@ public final class Constants {
   public static final String ACK_COUNT = "ack-count";
 
   public static final String EXECUTE_LATCH = "execute-latch";
+  public static final String EMIT_LATCH = "emit-latch";
   public static final String FAIL_LATCH = "fail-latch";
   public static final String ACK_LATCH = "ack-latch";
 
   public static final String ACTIVATE_COUNT_LATCH = "activate-count-latch";
   public static final String DEACTIVATE_COUNT_LATCH = "deactivate-count-latch";
 
+  public static final String PRESAVE_LATCH = "preSave-latch";
+  public static final String POSTSAVE_LATCH = "postSave-latch";
+  public static final String PRERESTORE_LATCH = "postSave-latch";
+  public static final String SPOUT_SHOULD_START_EMIT = 
"spout-should-start-emit";
+
   public static final String RECEIVED_STRING_LIST = "received-string-list";
 
   public static final String HERON_SYSTEM_CONFIG = 
"org.apache.heron.common.config.SystemConfig";
diff --git 
a/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java
 
b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java
new file mode 100644
index 0000000..14a36cf
--- /dev/null
+++ 
b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.proto.system.PhysicalPlans;
+
+@Ignore
+public final class MockPhysicalPlansBuilder {
+
+  private PhysicalPlans.PhysicalPlan.Builder pPlan;
+  private TopologyBuilder topologyBuilder;
+  private List<PhysicalPlans.Instance.Builder> instanceBuilders;
+
+  private Config conf;
+  private TopologyAPI.TopologyState initialTopologyState;
+
+  private MockPhysicalPlansBuilder() {
+    pPlan = PhysicalPlans.PhysicalPlan.newBuilder();
+    topologyBuilder = new TopologyBuilder();
+    instanceBuilders = new ArrayList<>();
+
+    conf = null;
+    initialTopologyState = null;
+  }
+
+  public static MockPhysicalPlansBuilder newBuilder() {
+    return new MockPhysicalPlansBuilder();
+  }
+
+  public MockPhysicalPlansBuilder withTopologyConfig(
+      Config.TopologyReliabilityMode reliabilityMode,
+      int messageTimeout
+  ) {
+    conf = new Config();
+    conf.setTeamEmail("streaming-comp...@twitter.com");
+    conf.setTeamName("stream-computing");
+    conf.setTopologyProjectName("heron-integration-test");
+    conf.setNumStmgrs(1);
+    conf.setMaxSpoutPending(100);
+    conf.setTopologyReliabilityMode(reliabilityMode);
+    if (messageTimeout != -1) {
+      conf.setMessageTimeoutSecs(messageTimeout);
+      conf.put("topology.enable.message.timeouts", "true");
+    }
+
+    return this;
+  }
+
+  public MockPhysicalPlansBuilder withTopologyState(TopologyAPI.TopologyState 
topologyState) {
+    initialTopologyState = topologyState;
+    return this;
+  }
+
+  public MockPhysicalPlansBuilder withSpoutInstance(
+      String componentName,
+      int taskId,
+      String instanceId,
+      IRichSpout spout
+  ) {
+    PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = 
PhysicalPlans.InstanceInfo.newBuilder();
+    spoutInstanceInfo.setComponentName(componentName);
+    spoutInstanceInfo.setTaskId(taskId);
+    spoutInstanceInfo.setComponentIndex(0);
+
+    PhysicalPlans.Instance.Builder spoutInstance = 
PhysicalPlans.Instance.newBuilder();
+    spoutInstance.setInstanceId(instanceId);
+    spoutInstance.setStmgrId("stream-manager-id");
+    spoutInstance.setInfo(spoutInstanceInfo);
+
+    topologyBuilder.setSpout(componentName, spout, 1);
+    instanceBuilders.add(spoutInstance);
+
+    return this;
+  }
+
+  public MockPhysicalPlansBuilder withBoltInstance(
+      String componentName,
+      int taskId,
+      String instanceId,
+      String upStreamComponentId,
+      IRichBolt bolt
+  ) {
+    PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = 
PhysicalPlans.InstanceInfo.newBuilder();
+    boltInstanceInfo.setComponentName(componentName);
+    boltInstanceInfo.setTaskId(taskId);
+    boltInstanceInfo.setComponentIndex(0);
+
+    PhysicalPlans.Instance.Builder boltInstance = 
PhysicalPlans.Instance.newBuilder();
+    boltInstance.setInstanceId(instanceId);
+    boltInstance.setStmgrId("stream-manager-id");
+    boltInstance.setInfo(boltInstanceInfo);
+
+    topologyBuilder.setBolt(componentName, bolt, 1)
+        .shuffleGrouping(upStreamComponentId);
+    instanceBuilders.add(boltInstance);
+
+    return this;
+  }
+
+  public PhysicalPlans.PhysicalPlan build() {
+    addStmgrs();
+    pPlan.setTopology(buildTopology());
+
+    for (PhysicalPlans.Instance.Builder b : instanceBuilders) {
+      pPlan.addInstances(b);
+    }
+
+    return pPlan.build();
+  }
+
+  private TopologyAPI.Topology buildTopology() {
+    return topologyBuilder
+        .createTopology()
+        .setName("topology-name")
+        .setConfig(conf)
+        .setState(initialTopologyState)
+        .getTopology();
+  }
+
+  private void addStmgrs() {
+    PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
+    stmgr.setId("stream-manager-id");
+    stmgr.setHostName("127.0.0.1");
+    stmgr.setDataPort(8888);
+    stmgr.setLocalEndpoint("endpoint");
+    pPlan.addStmgrs(stmgr);
+  }
+
+}
diff --git 
a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java 
b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java
new file mode 100644
index 0000000..e16efff
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestStatefulBolt extends BaseRichBolt
+    implements IStatefulComponent<String, String> {
+  @Override
+  public void prepare(
+      Map<String, Object> heronConf,
+      TopologyContext context,
+      OutputCollector collector) {
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    CountDownLatch tupleExecutedLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH);
+
+    if (tupleExecutedLatch != null) {
+      tupleExecutedLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  }
+}
diff --git 
a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java 
b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java
new file mode 100644
index 0000000..c22a917
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestStatefulSpout extends BaseRichSpout implements 
IStatefulComponent<String, String> {
+  @Override
+  public void open(
+      Map<String, Object> conf,
+      TopologyContext context,
+      SpoutOutputCollector collector) {
+  }
+
+  @Override
+  public void nextTuple() {
+    AtomicBoolean shouldStartEmit =
+        (AtomicBoolean) 
SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT);
+
+    if (shouldStartEmit != null && !shouldStartEmit.get()) {
+      return;
+    }
+
+    // actually "emit" the tuple
+    CountDownLatch emitLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH);
+
+    if (emitLatch != null) {
+      emitLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  }
+}
diff --git 
a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java
 
b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java
new file mode 100644
index 0000000..b67a9ee
--- /dev/null
+++ 
b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+@Ignore
+public class TestTwoPhaseStatefulBolt extends BaseRichBolt
+    implements ITwoPhaseStatefulComponent<String, String> {
+
+  private static final long serialVersionUID = -5160420613503624743L;
+
+  @Override
+  public void prepare(
+      Map<String, Object> map,
+      TopologyContext topologyContext,
+      OutputCollector collector) {
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    CountDownLatch tupleExecutedLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH);
+
+    if (tupleExecutedLatch != null) {
+      tupleExecutedLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("word"));
+  }
+
+  @Override
+  public void postSave(String checkpointId) {
+    CountDownLatch postSaveLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH);
+
+    if (postSaveLatch != null) {
+      postSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void preRestore(String checkpointId) {
+    CountDownLatch preRestoreLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH);
+
+    if (preRestoreLatch != null) {
+      preRestoreLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+}
diff --git 
a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java
 
b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java
new file mode 100644
index 0000000..a4b6efd
--- /dev/null
+++ 
b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestTwoPhaseStatefulSpout extends BaseRichSpout
+    implements ITwoPhaseStatefulComponent<String, String> {
+  @Override
+  public void open(
+      Map<String, Object> conf,
+      TopologyContext context,
+      SpoutOutputCollector collector) {
+  }
+
+  @Override
+  public void nextTuple() {
+    AtomicBoolean shouldStartEmit =
+        (AtomicBoolean) 
SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT);
+
+    if (shouldStartEmit != null && !shouldStartEmit.get()) {
+      return;
+    }
+
+    // actually "emit" the tuple
+    CountDownLatch emitLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH);
+
+    if (emitLatch != null) {
+      emitLatch.countDown();
+    }
+  }
+
+  @Override
+  public void postSave(String checkpointId) {
+    CountDownLatch postSaveLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH);
+
+    if (postSaveLatch != null) {
+      postSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void preRestore(String checkpointId) {
+    CountDownLatch preRestoreLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH);
+
+    if (preRestoreLatch != null) {
+      preRestoreLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) 
SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  }
+}
diff --git 
a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java 
b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
index 904ecab..8e39333 100644
--- a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
+++ b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
@@ -23,14 +23,17 @@ import java.lang.reflect.Field;
 import java.nio.file.Paths;
 import java.util.Map;
 
+import com.google.protobuf.Message;
+
 import org.junit.Ignore;
 
 import org.apache.heron.api.Config;
 import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.config.SystemConfig;
 import org.apache.heron.common.config.SystemConfigKey;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.stmgr.StreamManager;
 import org.apache.heron.proto.system.Common;
 import org.apache.heron.proto.system.PhysicalPlans;
@@ -60,91 +63,36 @@ public final class UnitTestHelper {
   public static PhysicalPlans.PhysicalPlan getPhysicalPlan(
       boolean ackEnabled,
       int messageTimeout,
-      TopologyAPI.TopologyState topologyState) {
-    PhysicalPlans.PhysicalPlan.Builder pPlan = 
PhysicalPlans.PhysicalPlan.newBuilder();
-
-    setTopology(pPlan, ackEnabled, messageTimeout, topologyState);
-
-    setInstances(pPlan);
-
-    setStMgr(pPlan);
-
-    return pPlan.build();
+      TopologyAPI.TopologyState topologyState
+  ) {
+    Config.TopologyReliabilityMode reliabilityMode = ackEnabled
+        ? Config.TopologyReliabilityMode.ATLEAST_ONCE
+        : Config.TopologyReliabilityMode.ATMOST_ONCE;
+
+    return MockPhysicalPlansBuilder
+        .newBuilder()
+        .withTopologyConfig(reliabilityMode, messageTimeout)
+        .withTopologyState(topologyState)
+        .withSpoutInstance(
+            "test-spout",
+            0,
+            "spout-id",
+            new TestSpout()
+        )
+        .withBoltInstance(
+            "test-bolt",
+            1,
+            "bolt-id",
+            "test-spout",
+            new TestBolt()
+        )
+        .build();
   }
 
   public static PhysicalPlans.PhysicalPlan getPhysicalPlan(boolean ackEnabled, 
int messageTimeout) {
     return getPhysicalPlan(ackEnabled, messageTimeout, 
TopologyAPI.TopologyState.RUNNING);
   }
 
-  private static void setTopology(PhysicalPlans.PhysicalPlan.Builder pPlan, 
boolean ackEnabled,
-                                  int messageTimeout, 
TopologyAPI.TopologyState topologyState) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-    topologyBuilder.setSpout("test-spout", new TestSpout(), 1);
-    // Here we need case switch to corresponding grouping
-    topologyBuilder.setBolt("test-bolt", new TestBolt(), 
1).shuffleGrouping("test-spout");
-
-    Config conf = new Config();
-    conf.setTeamEmail("streaming-comp...@twitter.com");
-    conf.setTeamName("stream-computing");
-    conf.setTopologyProjectName("heron-integration-test");
-    conf.setNumStmgrs(1);
-    conf.setMaxSpoutPending(100);
-    if (ackEnabled) {
-      
conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATLEAST_ONCE);
-    } else {
-      
conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATMOST_ONCE);
-    }
-    if (messageTimeout != -1) {
-      conf.setMessageTimeoutSecs(messageTimeout);
-      conf.put("topology.enable.message.timeouts", "true");
-    }
-
-    TopologyAPI.Topology fTopology =
-        topologyBuilder.createTopology().
-            setName("topology-name").
-            setConfig(conf).
-            setState(topologyState).
-            getTopology();
-
-    pPlan.setTopology(fTopology);
-  }
-
-  private static void setInstances(PhysicalPlans.PhysicalPlan.Builder pPlan) {
-    // Construct the spoutInstance
-    PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = 
PhysicalPlans.InstanceInfo.newBuilder();
-    spoutInstanceInfo.setComponentName("test-spout");
-    spoutInstanceInfo.setTaskId(0);
-    spoutInstanceInfo.setComponentIndex(0);
-
-    PhysicalPlans.Instance.Builder spoutInstance = 
PhysicalPlans.Instance.newBuilder();
-    spoutInstance.setInstanceId("spout-id");
-    spoutInstance.setStmgrId("stream-manager-id");
-    spoutInstance.setInfo(spoutInstanceInfo);
-
-    // Construct the boltInstanceInfo
-    PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = 
PhysicalPlans.InstanceInfo.newBuilder();
-    boltInstanceInfo.setComponentName("test-bolt");
-    boltInstanceInfo.setTaskId(1);
-    boltInstanceInfo.setComponentIndex(0);
-
-    PhysicalPlans.Instance.Builder boltInstance = 
PhysicalPlans.Instance.newBuilder();
-    boltInstance.setInstanceId("bolt-id");
-    boltInstance.setStmgrId("stream-manager-id");
-    boltInstance.setInfo(boltInstanceInfo);
-
-    pPlan.addInstances(spoutInstance);
-    pPlan.addInstances(boltInstance);
-  }
-
-  private static void setStMgr(PhysicalPlans.PhysicalPlan.Builder pPlan) {
-    PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
-    stmgr.setId("stream-manager-id");
-    stmgr.setHostName("127.0.0.1");
-    stmgr.setDataPort(8888);
-    stmgr.setLocalEndpoint("endpoint");
-    pPlan.addStmgrs(stmgr);
-  }
-
   @SuppressWarnings("unchecked")
   public static void clearSingletonRegistry() throws IllegalAccessException, 
NoSuchFieldException {
     // Remove the Singleton by Reflection
@@ -196,4 +144,60 @@ public final class UnitTestHelper {
     return registerInstanceResponse.build();
   }
 
+  public static Message buildPersistStateMessage(String checkpointId) {
+    CheckpointManager.InitiateStatefulCheckpoint.Builder builder = 
CheckpointManager
+        .InitiateStatefulCheckpoint
+        .newBuilder();
+
+    builder.setCheckpointId(checkpointId);
+
+    return builder.build();
+  }
+
+  public static InstanceControlMsg buildRestoreInstanceState(String 
checkpointId) {
+    return InstanceControlMsg.newBuilder()
+        .setRestoreInstanceStateRequest(
+            CheckpointManager.RestoreInstanceStateRequest
+                .newBuilder()
+                .setState(CheckpointManager.InstanceStateCheckpoint
+                    .newBuilder()
+                    .setCheckpointId(checkpointId))
+                .build()
+        )
+        .build();
+  }
+
+  public static InstanceControlMsg buildStartInstanceProcessingMessage(String 
checkpointId) {
+    return InstanceControlMsg.newBuilder()
+        .setStartInstanceStatefulProcessing(
+            CheckpointManager.StartInstanceStatefulProcessing
+                .newBuilder()
+                .setCheckpointId(checkpointId)
+                .build()
+        )
+        .build();
+  }
+
+  public static InstanceControlMsg buildCheckpointSavedMessage(
+      String checkpointId,
+      String packingPlanId
+  ) {
+    CheckpointManager.StatefulConsistentCheckpointSaved.Builder builder = 
CheckpointManager
+        .StatefulConsistentCheckpointSaved
+        .newBuilder();
+
+    CheckpointManager.StatefulConsistentCheckpoint.Builder ckptBuilder = 
CheckpointManager
+        .StatefulConsistentCheckpoint
+        .newBuilder();
+
+    ckptBuilder.setCheckpointId(checkpointId);
+    ckptBuilder.setPackingPlanId(packingPlanId);
+
+    builder.setConsistentCheckpoint(ckptBuilder.build());
+
+    return InstanceControlMsg.newBuilder()
+        .setStatefulCheckpointSaved(builder.build())
+        .build();
+  }
+
 }
diff --git a/heron/proto/ckptmgr.proto b/heron/proto/ckptmgr.proto
index ba6633a..ea6162c 100644
--- a/heron/proto/ckptmgr.proto
+++ b/heron/proto/ckptmgr.proto
@@ -108,6 +108,12 @@ message StartStatefulCheckpoint {
   required string checkpoint_id = 1;
 }
 
+// Message broadcasted to stmgr (will then be forwarded to instances) after 
when checkpoint becomes
+// "globally consistent"
+message StatefulConsistentCheckpointSaved {
+  required StatefulConsistentCheckpoint consistent_checkpoint = 1;
+}
+
 // Message sent by tmaster to stmgr asking them to reset their instances
 // to this checkpoint
 message RestoreTopologyStateRequest {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp 
b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 6c0f973..9a752de 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -498,6 +498,15 @@ void InstanceServer::BroadcastNewPhysicalPlan(const 
proto::system::PhysicalPlan&
   }
 }
 
+void InstanceServer::BroadcastStatefulCheckpointSaved(
+    const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+  for (auto & iter : active_instances_) {
+    LOG(INFO) << "Sending checkpoint: " << 
_msg.consistent_checkpoint().checkpoint_id()
+              << " saved message to instance with task_id: " << iter.second;
+    SendMessage(iter.first, _msg);
+  }
+}
+
 void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan,
                                   const std::string& _component,
                                   Connection* _conn) const {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h 
b/heron/stmgr/src/cpp/manager/instance-server.h
index df4b5e9..b4a6038 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -72,6 +72,9 @@ class InstanceServer : public Server {
 
   void BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan);
 
+  void BroadcastStatefulCheckpointSaved(
+      const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg);
+
   virtual bool HaveAllInstancesConnectedToUs() const {
     return active_instances_.size() == expected_instances_.size();
   }
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp 
b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 6510271..aa6625e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -368,12 +368,18 @@ void 
StMgr::CreateTMasterClient(shared_ptr<proto::tmaster::TMasterLocation> tmas
     this->StartStatefulProcessing(checkpoint_id);
   };
 
+  auto broadcast_checkpoint_saved =
+       [this](const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+    this->BroadcastCheckpointSaved(_msg);
+  };
+
   tmaster_client_ = make_shared<TMasterClient>(eventLoop_, master_options, 
stmgr_id_, stmgr_host_,
                                       data_port_, local_data_port_, 
shell_port_,
                                       std::move(pplan_watch),
                                       std::move(stateful_checkpoint_watch),
                                       std::move(restore_topology_watch),
-                                      std::move(start_stateful_watch));
+                                      std::move(start_stateful_watch),
+                                      std::move(broadcast_checkpoint_saved));
 }
 
 void StMgr::CreateTupleCache() {
@@ -1080,6 +1086,12 @@ void StMgr::RestoreTopologyState(sp_string 
_checkpoint_id, sp_int64 _restore_txi
   stateful_restorer_->StartRestore(_checkpoint_id, _restore_txid, 
local_taskids, *pplan_);
 }
 
+// broadcast the news that the checkpoint has been saved to all instances 
connected to this stmgr
+void StMgr::BroadcastCheckpointSaved(
+        const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+  instance_server_->BroadcastStatefulCheckpointSaved(_msg);
+}
+
 // Called by TmasterClient when it receives directive from tmaster
 // to start processing after having previously recovered the state at 
_checkpoint_id
 void StMgr::StartStatefulProcessing(sp_string _checkpoint_id) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h 
b/heron/stmgr/src/cpp/manager/stmgr.h
index decdc39..582ac3a 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -197,6 +197,10 @@ class StMgr {
   void HandleStatefulRestoreDone(proto::system::StatusCode _status,
                                  std::string _checkpoint_id, sp_int64 
_restore_txid);
 
+  // Called when stmgr received StatefulConsistentCheckpointSaved message from 
TMaster
+  // Then, the stmgr will forward this fact to all heron instances connected 
to it
+  void BroadcastCheckpointSaved(const 
proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg);
+
   // Patch new physical plan with internal hydrated topology but keep new 
topology data:
   // - new topology state
   // - new topology/component config
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp 
b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
index f1e27d8..657077d 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
@@ -41,7 +41,9 @@ TMasterClient::TMasterClient(shared_ptr<EventLoop> eventLoop, 
const NetworkOptio
                              
VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch,
                              VCallback<sp_string> _stateful_checkpoint_watch,
                              VCallback<sp_string, sp_int64> 
_restore_topology_watch,
-                             VCallback<sp_string> _start_stateful_watch)
+                             VCallback<sp_string> _start_stateful_watch,
+                             VCallback<const 
proto::ckptmgr::StatefulConsistentCheckpointSaved&>
+                                 _broadcast_checkpoint_saved)
     : Client(eventLoop, _options),
       stmgr_id_(_stmgr_id),
       stmgr_host_(_stmgr_host),
@@ -53,6 +55,7 @@ TMasterClient::TMasterClient(shared_ptr<EventLoop> eventLoop, 
const NetworkOptio
       stateful_checkpoint_watch_(std::move(_stateful_checkpoint_watch)),
       restore_topology_watch_(std::move(_restore_topology_watch)),
       start_stateful_watch_(std::move(_start_stateful_watch)),
+      broadcast_checkpoint_saved_(_broadcast_checkpoint_saved),
       reconnect_timer_id(0),
       heartbeat_timer_id(0),
       reconnect_attempts_(0) {
@@ -74,6 +77,7 @@ TMasterClient::TMasterClient(shared_ptr<EventLoop> eventLoop, 
const NetworkOptio
   InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointMessage);
   InstallMessageHandler(&TMasterClient::HandleRestoreTopologyStateRequest);
   InstallMessageHandler(&TMasterClient::HandleStartStmgrStatefulProcessing);
+  InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointSavedMessage);
 }
 
 TMasterClient::~TMasterClient() {
@@ -303,6 +307,11 @@ void TMasterClient::HandleStartStmgrStatefulProcessing(
   start_stateful_watch_(_message->checkpoint_id());
 }
 
+void TMasterClient::HandleStatefulCheckpointSavedMessage(
+    pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> _msg) {
+  broadcast_checkpoint_saved_(*_msg);
+}
+
 void TMasterClient::SendResetTopologyState(const std::string& _dead_stmgr,
                                            int32_t _dead_task,
                                            const std::string& _reason) {
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.h 
b/heron/stmgr/src/cpp/manager/tmaster-client.h
index ec83406..2457d72 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.h
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.h
@@ -42,7 +42,9 @@ class TMasterClient : public Client {
                 VCallback<shared_ptr<proto::system::PhysicalPlan>> 
_pplan_watch,
                 VCallback<sp_string> _stateful_checkpoint_watch,
                 VCallback<sp_string, sp_int64> _restore_topology_watch,
-                VCallback<sp_string> _start_stateful_watch);
+                VCallback<sp_string> _start_stateful_watch,
+                VCallback<const 
proto::ckptmgr::StatefulConsistentCheckpointSaved&>
+                    _broadcast_checkpoint_saved);
   virtual ~TMasterClient();
 
   // Told by the upper layer to disconnect and self destruct
@@ -88,6 +90,9 @@ class TMasterClient : public Client {
   void HandleStartStmgrStatefulProcessing(
           pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
 
+  void HandleStatefulCheckpointSavedMessage(
+          pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> 
_msg);
+
   void OnReConnectTimer();
   void OnHeartbeatTimer();
   void SendRegisterRequest();
@@ -116,6 +121,9 @@ class TMasterClient : public Client {
   // We invoke this callback upon receiving a StartStatefulProcessing message 
from tmaster
   // passing in the checkpoint id
   VCallback<sp_string> start_stateful_watch_;
+  // This callback will be invoked upon receiving a 
StatefulConsistentCheckpointSaved message.
+  // We will then forward this message to all the instances connected to this 
stmgr
+  VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&> 
broadcast_checkpoint_saved_;
 
   // Configs to be read
   sp_int32 reconnect_tmaster_interval_sec_;
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.cpp 
b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
index 31dcf7b..a348c10 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.cpp
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
@@ -55,14 +55,18 @@ StatefulController::StatefulController(const std::string& 
_topology_name,
                shared_ptr<heron::common::HeronStateMgr> _state_mgr,
                std::chrono::high_resolution_clock::time_point 
_tmaster_start_time,
                shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
-               std::function<void(std::string)> _ckpt_save_watcher)
-  : topology_name_(_topology_name), ckpt_record_(std::move(_ckpt)), 
state_mgr_(_state_mgr),
-    metrics_manager_client_(_metrics_manager_client) {
+               std::function<void(const 
proto::ckptmgr::StatefulConsistentCheckpoints&)>
+                   _ckpt_save_watcher)
+  : topology_name_(_topology_name),
+    ckpt_record_(std::move(_ckpt)),
+    state_mgr_(_state_mgr),
+    metrics_manager_client_(_metrics_manager_client),
+    ckpt_save_watcher_(_ckpt_save_watcher) {
   checkpointer_ = make_unique<StatefulCheckpointer>(_tmaster_start_time);
   restorer_ = make_unique<StatefulRestorer>();
   count_metrics_ = make_shared<common::MultiCountMetric>();
+
   metrics_manager_client_->register_metric("__stateful_controller", 
count_metrics_);
-  ckpt_save_watcher_ = _ckpt_save_watcher;
 }
 
 StatefulController::~StatefulController() {
@@ -162,13 +166,13 @@ void StatefulController::HandleCheckpointSave(
         shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt,
         proto::system::StatusCode _status) {
   if (_status == proto::system::OK) {
-    LOG(INFO) << "Successfully saved " << 
_new_ckpt->consistent_checkpoints(0).checkpoint_id()
-              << " as the new globally consistent checkpoint";
     ckpt_record_ = std::move(_new_ckpt);
-    std::string oldest_ckpt =
-        
ckpt_record_->consistent_checkpoints(ckpt_record_->consistent_checkpoints_size()
 - 1)
-          .checkpoint_id();
-    ckpt_save_watcher_(oldest_ckpt);
+
+    LOG(INFO) << "Successfully saved "
+              << ckpt_record_->consistent_checkpoints(0).checkpoint_id()
+              << " as the latest globally consistent checkpoint";
+
+    ckpt_save_watcher_(*ckpt_record_);
   } else {
     LOG(ERROR) << "Error saving " << 
_new_ckpt->consistent_checkpoints(0).checkpoint_id()
               << " as the new globally consistent checkpoint "
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.h 
b/heron/tmaster/src/cpp/manager/stateful-controller.h
index 6ceda37..4e3b618 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.h
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.h
@@ -58,7 +58,8 @@ class StatefulController {
                shared_ptr<heron::common::HeronStateMgr> _state_mgr,
                std::chrono::high_resolution_clock::time_point 
_tmaster_start_time,
                shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
-               std::function<void(std::string)> _ckpt_save_watcher);
+               std::function<void(const 
proto::ckptmgr::StatefulConsistentCheckpoints&)>
+                   _ckpt_save_watcher);
   virtual ~StatefulController();
   // Start a new restore process
   void StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_checkpoints);
@@ -102,7 +103,7 @@ class StatefulController {
   unique_ptr<StatefulRestorer> restorer_;
   shared_ptr<common::MetricsMgrSt> metrics_manager_client_;
   shared_ptr<common::MultiCountMetric> count_metrics_;
-  std::function<void(std::string)> ckpt_save_watcher_;
+  std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)> 
ckpt_save_watcher_;
 };
 }  // namespace tmaster
 }  // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.cpp 
b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
index 46be97d..b144244 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.cpp
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
@@ -107,10 +107,17 @@ void StMgrState::NewPhysicalPlan(const 
proto::system::PhysicalPlan& _pplan) {
 }
 
 void StMgrState::NewStatefulCheckpoint(const 
proto::ckptmgr::StartStatefulCheckpoint& _request) {
-  LOG(INFO) << "Sending a new stateful checkpoint request to stmgr" << 
stmgr_->id();
+  LOG(INFO) << "Sending a new stateful checkpoint request to stmgr: " << 
stmgr_->id();
   server_.SendMessage(connection_, _request);
 }
 
+void StMgrState::SendCheckpointSavedMessage(
+        const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg) {
+  LOG(INFO) << "Sending checkpoint saved message to stmgr: " << stmgr_->id() 
<< " "
+            << "for checkpoint: " << 
_msg.consistent_checkpoint().checkpoint_id();
+  server_.SendMessage(connection_, _msg);
+}
+
 /*
 void
 StMgrState::AddAssignment(const std::vector<pair<string, sp_int32> >& 
_assignments,
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.h 
b/heron/tmaster/src/cpp/manager/stmgrstate.h
index 270d93a..d9d9159 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.h
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.h
@@ -70,6 +70,8 @@ class StMgrState {
   // Send stateful checkpoint message to the stmgr
   void NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& 
_request);
 
+  void SendCheckpointSavedMessage(const 
proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg);
+
 
   bool TimedOut() const;
 
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp 
b/heron/tmaster/src/cpp/manager/tmaster.cpp
index 304634f..36b7845 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -398,8 +398,8 @@ void TMaster::SetupStatefulController(
        
config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_,
 300);
   CHECK_GT(stateful_checkpoint_interval, 0);
 
-  auto cb = [this](std::string _oldest_ckptid) {
-    this->HandleStatefulCheckpointSave(_oldest_ckptid);
+  auto cb = [this](const proto::ckptmgr::StatefulConsistentCheckpoints& 
new_ckpts) {
+    this->HandleStatefulCheckpointSave(new_ckpts);
   };
   // Instantiate the stateful restorer/coordinator
   stateful_controller_ = make_unique<StatefulController>(topology_->name(), 
_ckpt, state_mgr_,
@@ -616,8 +616,22 @@ void TMaster::CleanAllStatefulCheckpoint() {
   ckptmgr_client_->SendCleanStatefulCheckpointRequest("", true);
 }
 
-void TMaster::HandleStatefulCheckpointSave(const std::string& _oldest_ckpt) {
-  ckptmgr_client_->SendCleanStatefulCheckpointRequest(_oldest_ckpt, false);
+void TMaster::HandleStatefulCheckpointSave(
+    const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts) {
+  // broadcast globally consistent checkpoint completion
+  proto::ckptmgr::StatefulConsistentCheckpointSaved msg;
+  
msg.mutable_consistent_checkpoint()->CopyFrom(new_ckpts.consistent_checkpoints(0));
+
+  for (auto & stmgr : stmgrs_) {
+    stmgr.second->SendCheckpointSavedMessage(msg);
+  }
+
+  // clean oldest checkpoint on save
+  std::string oldest_ckpt_id =
+      new_ckpts.consistent_checkpoints(new_ckpts.consistent_checkpoints_size() 
- 1)
+        .checkpoint_id();
+
+  ckptmgr_client_->SendCleanStatefulCheckpointRequest(oldest_ckpt_id, false);
 }
 
 // Called when ckptmgr completes the clean stateful checkpoint request
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h 
b/heron/tmaster/src/cpp/manager/tmaster.h
index d37fbaa..3bdd25e 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -195,7 +195,8 @@ class TMaster {
                                      const ComponentConfigMap& _config);
 
   // Function called when a new stateful ckpt record is saved
-  void HandleStatefulCheckpointSave(const std::string& _oldest_ckpt);
+  void HandleStatefulCheckpointSave(
+      const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts);
 
   // Function called to kill container
   void KillContainer(const std::string& host_name,

Reply via email to