http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 0eb140a..b33a69e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImplTest;
+import org.apache.flink.runtime.state.TestTaskLocalStateStore;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -56,8 +59,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Random;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -91,19 +92,6 @@ public class StreamTaskStateInitializerImplTest {
                        typeSerializer,
                        closeableRegistry);
 
-               verify(stateBackend).createKeyedStateBackend(
-                       any(Environment.class),
-                       any(JobID.class),
-                       any(String.class),
-                       eq(typeSerializer),
-                       anyInt(),
-                       any(KeyGroupRange.class),
-                       any(TaskKvStateRegistry.class));
-
-               verify(stateBackend).createOperatorStateBackend(
-                       any(Environment.class),
-                       any(String.class));
-
                OperatorStateBackend operatorStateBackend = 
stateContext.operatorStateBackend();
                AbstractKeyedStateBackend<?> keyedStateBackend = 
stateContext.keyedStateBackend();
                InternalTimeServiceManager<?, ?> timeServiceManager = 
stateContext.internalTimerServiceManager();
@@ -124,13 +112,8 @@ public class StreamTaskStateInitializerImplTest {
                        keyedStateInputs,
                        operatorStateInputs);
 
-               for (KeyGroupStatePartitionStreamProvider keyedStateInput : 
keyedStateInputs) {
-                       Assert.fail();
-               }
-
-               for (StatePartitionStreamProvider operatorStateInput : 
operatorStateInputs) {
-                       Assert.fail();
-               }
+               Assert.assertFalse(keyedStateInputs.iterator().hasNext());
+               Assert.assertFalse(operatorStateInputs.iterator().hasNext());
        }
 
        @SuppressWarnings("unchecked")
@@ -172,14 +155,14 @@ public class StreamTaskStateInitializerImplTest {
                Random random = new Random(0x42);
 
                OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(
-                       new OperatorStateHandle(
+                       new OperatorStreamStateHandle(
                                Collections.singletonMap(
                                        "a",
                                        new OperatorStateHandle.StateMetaInfo(
                                                new long[]{0, 10},
                                                
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)),
                                
CheckpointTestUtils.createDummyStreamStateHandle(random)),
-                       new OperatorStateHandle(
+                       new OperatorStreamStateHandle(
                                Collections.singletonMap(
                                        "_default_",
                                        new OperatorStateHandle.StateMetaInfo(
@@ -209,19 +192,6 @@ public class StreamTaskStateInitializerImplTest {
                        typeSerializer,
                        closeableRegistry);
 
-               verify(mockingBackend).createKeyedStateBackend(
-                       any(Environment.class),
-                       any(JobID.class),
-                       any(String.class),
-                       eq(typeSerializer),
-                       anyInt(),
-                       any(KeyGroupRange.class),
-                       any(TaskKvStateRegistry.class));
-
-               verify(mockingBackend).createOperatorStateBackend(
-                       any(Environment.class),
-                       any(String.class));
-
                OperatorStateBackend operatorStateBackend = 
stateContext.operatorStateBackend();
                AbstractKeyedStateBackend<?> keyedStateBackend = 
stateContext.keyedStateBackend();
                InternalTimeServiceManager<?, ?> timeServiceManager = 
stateContext.internalTimerServiceManager();
@@ -276,11 +246,14 @@ public class StreamTaskStateInitializerImplTest {
                ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(23L, 24L);
                TestCheckpointResponder checkpointResponderMock = new 
TestCheckpointResponder();
 
+               TaskLocalStateStore taskLocalStateStore = new 
TestTaskLocalStateStore();
+
                TaskStateManager taskStateManager = 
TaskStateManagerImplTest.taskStateManager(
                        jobID,
                        executionAttemptID,
                        checkpointResponderMock,
-                       jobManagerTaskRestore);
+                       jobManagerTaskRestore,
+                       taskLocalStateStore);
 
                DummyEnvironment dummyEnvironment = new 
DummyEnvironment("test-task", 1, 0);
                dummyEnvironment.setTaskStateManager(taskStateManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 8e80f44..35e2fbd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -538,7 +538,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
                testHarness.waitForTaskCompletion();
 
                // set the operator state from previous attempt into the 
restored one
-               TaskStateSnapshot subtaskStates = 
taskStateManagerMock.getLastTaskStateSnapshot();
+               TaskStateSnapshot subtaskStates = 
taskStateManagerMock.getLastJobManagerTaskStateSnapshot();
 
                final OneInputStreamTaskTestHarness<Integer, Integer> 
restoredTaskHarness =
                                new OneInputStreamTaskTestHarness<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 6926480..6a13f11 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -405,7 +405,6 @@ public class WindowOperatorTest extends TestLogger {
                OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
-
                testHarness.close();
 
                testHarness = createTestHarness(operator);
@@ -483,7 +482,6 @@ public class WindowOperatorTest extends TestLogger {
                OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
-
                testHarness.close();
 
                testHarness = createTestHarness(operator);
@@ -794,7 +792,6 @@ public class WindowOperatorTest extends TestLogger {
                OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
-
                testHarness.close();
 
                expectedOutput.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 499dfb1..84bcf5a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -60,6 +61,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestTaskStateManager;
@@ -197,7 +199,7 @@ public class InterruptSensitiveRestoreTest {
                KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(new KeyGroupRange(0, 0));
 
                Collection<OperatorStateHandle> operatorStateHandles =
-                               Collections.singletonList(new 
OperatorStateHandle(operatorStateMetadata, state));
+                               Collections.singletonList(new 
OperatorStreamStateHandle(operatorStateMetadata, state));
 
                List<KeyedStateHandle> keyedStateHandles =
                                Collections.singletonList(new 
KeyGroupsStateHandle(keyGroupRangeOffsets, state));
@@ -220,10 +222,10 @@ public class InterruptSensitiveRestoreTest {
                }
 
                OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(
-                       operatorStateBackend,
-                       operatorStateStream,
-                       keyedStateFromBackend,
-                       keyedStateFromStream);
+                       new StateObjectCollection<>(operatorStateBackend),
+                       new StateObjectCollection<>(operatorStateStream),
+                       new StateObjectCollection<>(keyedStateFromBackend),
+                       new StateObjectCollection<>(keyedStateFromStream));
 
                JobVertexID jobVertexID = new JobVertexID();
                OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
@@ -254,7 +256,7 @@ public class InterruptSensitiveRestoreTest {
 
                TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
                
taskStateManager.setReportedCheckpointId(taskRestore.getRestoreCheckpointId());
-               taskStateManager.setTaskStateSnapshotsByCheckpointId(
+               taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
                        Collections.singletonMap(
                                taskRestore.getRestoreCheckpointId(),
                                taskRestore.getTaskStateSnapshot()));

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
new file mode 100644
index 0000000..e35f97c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
+import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for forwarding of state reporting to and from {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+ */
+public class LocalStateForwardingTest extends TestLogger {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * This tests the forwarding of jm and tm-local state from the futures 
reported by the backends, through the
+        * async checkpointing thread to the {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+        */
+       @Test
+       public void testReportingFromSnapshotToTaskStateManager() {
+
+               TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
+
+               StreamMockEnvironment streamMockEnvironment = new 
StreamMockEnvironment(
+                       new Configuration(),
+                       new Configuration(),
+                       new ExecutionConfig(),
+                       1024 * 1024,
+                       new MockInputSplitProvider(),
+                       0,
+                       taskStateManager);
+
+               StreamTask testStreamTask = new 
StreamTaskTest.NoOpStreamTask(streamMockEnvironment);
+               CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(0L, 0L);
+               CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
+
+               Map<OperatorID, OperatorSnapshotFutures> snapshots = new 
HashMap<>(1);
+               OperatorSnapshotFutures osFuture = new 
OperatorSnapshotFutures();
+
+               
osFuture.setKeyedStateManagedFuture(createSnapshotResult(KeyedStateHandle.class));
+               
osFuture.setKeyedStateRawFuture(createSnapshotResult(KeyedStateHandle.class));
+               
osFuture.setOperatorStateManagedFuture(createSnapshotResult(OperatorStateHandle.class));
+               
osFuture.setOperatorStateRawFuture(createSnapshotResult(OperatorStateHandle.class));
+
+               OperatorID operatorID = new OperatorID();
+               snapshots.put(operatorID, osFuture);
+
+               StreamTask.AsyncCheckpointRunnable checkpointRunnable =
+                       new StreamTask.AsyncCheckpointRunnable(
+                               testStreamTask,
+                               snapshots,
+                               checkpointMetaData,
+                               checkpointMetrics,
+                               0L);
+
+               checkpointRunnable.run();
+
+               TaskStateSnapshot lastJobManagerTaskStateSnapshot = 
taskStateManager.getLastJobManagerTaskStateSnapshot();
+               TaskStateSnapshot lastTaskManagerTaskStateSnapshot = 
taskStateManager.getLastTaskManagerTaskStateSnapshot();
+
+               OperatorSubtaskState jmState =
+                       
lastJobManagerTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+
+               OperatorSubtaskState tmState =
+                       
lastTaskManagerTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+
+               performCheck(osFuture.getKeyedStateManagedFuture(), 
jmState.getManagedKeyedState(), tmState.getManagedKeyedState());
+               performCheck(osFuture.getKeyedStateRawFuture(), 
jmState.getRawKeyedState(), tmState.getRawKeyedState());
+               performCheck(osFuture.getOperatorStateManagedFuture(), 
jmState.getManagedOperatorState(), tmState.getManagedOperatorState());
+               performCheck(osFuture.getOperatorStateRawFuture(), 
jmState.getRawOperatorState(), tmState.getRawOperatorState());
+       }
+
+       /**
+        * This tests that state that was reported to the {@link 
org.apache.flink.runtime.state.TaskStateManager} is also
+        * reported to {@link 
org.apache.flink.runtime.taskmanager.CheckpointResponder} and {@link 
TaskLocalStateStoreImpl}.
+        */
+       @Test
+       public void 
testReportingFromTaskStateManagerToResponderAndTaskLocalStateStore() throws 
Exception {
+
+               final JobID jobID = new JobID();
+               final AllocationID allocationID = new AllocationID();
+               final ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID();
+               final CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(42L, 4711L);
+               final CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics();
+               final int subtaskIdx = 42;
+               JobVertexID jobVertexID = new JobVertexID();
+
+               TaskStateSnapshot jmSnapshot = new TaskStateSnapshot();
+               TaskStateSnapshot tmSnapshot = new TaskStateSnapshot();
+
+               final AtomicBoolean jmReported = new AtomicBoolean(false);
+               final AtomicBoolean tmReported = new AtomicBoolean(false);
+
+               TestCheckpointResponder checkpointResponder = new 
TestCheckpointResponder() {
+
+                       @Override
+                       public void acknowledgeCheckpoint(
+                               JobID lJobID,
+                               ExecutionAttemptID lExecutionAttemptID,
+                               long lCheckpointId,
+                               CheckpointMetrics lCheckpointMetrics,
+                               TaskStateSnapshot lSubtaskState) {
+
+                               Assert.assertEquals(jobID, lJobID);
+                               Assert.assertEquals(executionAttemptID, 
lExecutionAttemptID);
+                               
Assert.assertEquals(checkpointMetaData.getCheckpointId(), lCheckpointId);
+                               Assert.assertEquals(checkpointMetrics, 
lCheckpointMetrics);
+                               jmReported.set(true);
+                       }
+               };
+
+               Executor executor = Executors.directExecutor();
+
+               LocalRecoveryDirectoryProviderImpl directoryProvider = new 
LocalRecoveryDirectoryProviderImpl(
+                       temporaryFolder.newFolder(),
+                       jobID,
+                       jobVertexID,
+                       subtaskIdx);
+
+               LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(
+                       LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
+                       directoryProvider);
+
+               TaskLocalStateStore taskLocalStateStore =
+                       new TaskLocalStateStoreImpl(jobID, allocationID, 
jobVertexID, subtaskIdx, localRecoveryConfig, executor) {
+                               @Override
+                               public void storeLocalState(
+                                       @Nonnegative long checkpointId,
+                                       @Nullable TaskStateSnapshot localState) 
{
+
+                                       Assert.assertEquals(tmSnapshot, 
localState);
+                                       tmReported.set(true);
+                               }
+                       };
+
+               TaskStateManagerImpl taskStateManager =
+                       new TaskStateManagerImpl(
+                               jobID,
+                               executionAttemptID,
+                               taskLocalStateStore,
+                               null,
+                               checkpointResponder);
+
+               taskStateManager.reportTaskStateSnapshots(
+                       checkpointMetaData,
+                       checkpointMetrics,
+                       jmSnapshot,
+                       tmSnapshot);
+
+               Assert.assertTrue("Reporting for JM state was not called.", 
jmReported.get());
+               Assert.assertTrue("Reporting for TM state was not called.", 
tmReported.get());
+       }
+
+       private static <T extends StateObject> void performCheck(
+               Future<SnapshotResult<T>> resultFuture,
+               StateObjectCollection<T> jmState,
+               StateObjectCollection<T> tmState) {
+
+               SnapshotResult<T> snapshotResult;
+               try {
+                       snapshotResult = resultFuture.get();
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               Assert.assertEquals(
+                       snapshotResult.getJobManagerOwnedSnapshot(),
+                       jmState.iterator().next());
+
+               Assert.assertEquals(
+                       snapshotResult.getTaskLocalSnapshot(),
+                       tmState.iterator().next());
+       }
+
+       private static <T extends StateObject> 
RunnableFuture<SnapshotResult<T>> createSnapshotResult(Class<T> clazz) {
+               return DoneFuture.of(SnapshotResult.withLocalState(mock(clazz), 
mock(clazz)));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 3e0459d..81fb447 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -538,20 +538,20 @@ public class OneInputStreamTaskTest extends TestLogger {
 
                restoredTaskHarness.configureForKeyedStream(keySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
 
-               restoredTaskHarness.setTaskStateSnapshot(checkpointId, 
taskStateManager.getLastTaskStateSnapshot());
+               restoredTaskHarness.setTaskStateSnapshot(checkpointId, 
taskStateManager.getLastJobManagerTaskStateSnapshot());
 
                StreamConfig restoredTaskStreamConfig = 
restoredTaskHarness.getStreamConfig();
 
                configureChainedTestingStreamOperator(restoredTaskStreamConfig, 
numberChainedTasks);
 
-               TaskStateSnapshot stateHandles = 
taskStateManager.getLastTaskStateSnapshot();
+               TaskStateSnapshot stateHandles = 
taskStateManager.getLastJobManagerTaskStateSnapshot();
                Assert.assertEquals(numberChainedTasks, 
stateHandles.getSubtaskStateMappings().size());
 
                TestingStreamOperator.numberRestoreCalls = 0;
 
                // transfer state to new harness
                
restoredTaskHarness.taskStateManager.restoreLatestCheckpointState(
-                       taskStateManager.getTaskStateSnapshotsByCheckpointId());
+                       
taskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
                restoredTaskHarness.invoke();
                restoredTaskHarness.endInput();
                
restoredTaskHarness.waitForTaskCompletion(deadline.timeLeft().toMillis());

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
index f9f4473..c66040a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
@@ -252,7 +252,7 @@ public class RestoreStreamTaskTest extends TestLogger {
 
                JobManagerTaskRestore jobManagerTaskRestore = new 
JobManagerTaskRestore(
                        taskStateManager.getReportedCheckpointId(),
-                       taskStateManager.getLastTaskStateSnapshot());
+                       taskStateManager.getLastJobManagerTaskStateSnapshot());
 
                testHarness.endInput();
                testHarness.waitForTaskCompletion();

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 0ba081e..32de8d5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -46,7 +46,6 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -159,8 +158,6 @@ public class StreamMockEnvironment implements Environment {
 
                KvStateRegistry registry = new KvStateRegistry();
                this.kvStateRegistry = registry.createTaskRegistry(jobID, 
getJobVertexId());
-
-               final TaskLocalStateStore localStateStore = new 
TaskLocalStateStore(jobID, getJobVertexId(), subtaskIndex);
        }
 
        public StreamMockEnvironment(
@@ -304,10 +301,11 @@ public class StreamMockEnvironment implements Environment 
{
 
        @Override
        public void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics, TaskStateSnapshot subtaskState) {
-               taskStateManager.reportStateHandles(
+               taskStateManager.reportTaskStateSnapshots(
                        new CheckpointMetaData(checkpointId, 0L),
                        checkpointMetrics,
-                       subtaskState);
+                       subtaskState,
+                       null);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 24b2014..62a903b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -58,6 +58,7 @@ import 
org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
@@ -282,10 +283,10 @@ public class StreamTaskTerminationTest extends TestLogger 
{
                }
        }
 
-       static class BlockingCallable implements Callable<OperatorStateHandle> {
+       static class BlockingCallable implements 
Callable<SnapshotResult<OperatorStateHandle>> {
 
                @Override
-               public OperatorStateHandle call() throws Exception {
+               public SnapshotResult<OperatorStateHandle> call() throws 
Exception {
                        // notify that we have started the asynchronous 
checkpointed operation
                        CHECKPOINTING_LATCH.trigger();
                        // wait until we have reached the StreamTask#cleanup 
--> This will already cancel this FutureTask

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 99d4e5b..caea662 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -67,15 +68,16 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackendFactory;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
-import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import org.apache.flink.runtime.state.TestTaskStateManager;
@@ -94,6 +96,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -147,7 +150,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollectionOf;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -221,7 +223,7 @@ public class StreamTaskTest extends TestLogger {
        @Test
        public void testStateBackendLoadingAndClosing() throws Exception {
                Configuration taskManagerConfig = new Configuration();
-               taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, 
MockStateBackend.class.getName());
+               taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, 
TestMemoryStateBackendFactory.class.getName());
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setStateKeySerializer(mock(TypeSerializer.class));
@@ -253,7 +255,7 @@ public class StreamTaskTest extends TestLogger {
        @Test
        public void testStateBackendClosingOnFailure() throws Exception {
                Configuration taskManagerConfig = new Configuration();
-               taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, 
MockStateBackend.class.getName());
+               taskManagerConfig.setString(CheckpointingOptions.STATE_BACKEND, 
TestMemoryStateBackendFactory.class.getName());
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setStateKeySerializer(mock(TypeSerializer.class));
@@ -414,7 +416,7 @@ public class StreamTaskTest extends TestLogger {
                OperatorSnapshotFutures operatorSnapshotResult2 = 
mock(OperatorSnapshotFutures.class);
                OperatorSnapshotFutures operatorSnapshotResult3 = 
mock(OperatorSnapshotFutures.class);
 
-               RunnableFuture<OperatorStateHandle> failingFuture = 
mock(RunnableFuture.class);
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
failingFuture = mock(RunnableFuture.class);
                when(failingFuture.get()).thenThrow(new ExecutionException(new 
Exception("Test exception")));
 
                
when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
@@ -499,7 +501,7 @@ public class StreamTaskTest extends TestLogger {
                TaskStateManager taskStateManager = new TaskStateManagerImpl(
                        new JobID(1L, 2L),
                        new ExecutionAttemptID(1L, 2L),
-                       mock(TaskLocalStateStore.class),
+                       mock(TaskLocalStateStoreImpl.class),
                        null,
                        checkpointResponder);
 
@@ -514,17 +516,18 @@ public class StreamTaskTest extends TestLogger {
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
 
                StreamOperator<?> streamOperator = mock(StreamOperator.class);
+               when(streamOperator.getOperatorID()).thenReturn(new 
OperatorID(42, 42));
 
                KeyedStateHandle managedKeyedStateHandle = 
mock(KeyedStateHandle.class);
                KeyedStateHandle rawKeyedStateHandle = 
mock(KeyedStateHandle.class);
-               OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStateHandle.class);
-               OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStateHandle.class);
+               OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStreamStateHandle.class);
+               OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStreamStateHandle.class);
 
                OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
-                       new DoneFuture<>(managedKeyedStateHandle),
-                       new DoneFuture<>(rawKeyedStateHandle),
-                       new DoneFuture<>(managedOperatorStateHandle),
-                       new DoneFuture<>(rawOperatorStateHandle));
+                       
DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)),
+                       DoneFuture.of(SnapshotResult.of(rawKeyedStateHandle)),
+                       
DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)),
+                       
DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle)));
 
                when(streamOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
 
@@ -561,10 +564,10 @@ public class StreamTaskTest extends TestLogger {
                OperatorSubtaskState subtaskState = 
subtaskStates.getSubtaskStateMappings().iterator().next().getValue();
 
                // check that the subtask state contains the expected state 
handles
-               
assertEquals(Collections.singletonList(managedKeyedStateHandle), 
subtaskState.getManagedKeyedState());
-               assertEquals(Collections.singletonList(rawKeyedStateHandle), 
subtaskState.getRawKeyedState());
-               
assertEquals(Collections.singletonList(managedOperatorStateHandle), 
subtaskState.getManagedOperatorState());
-               assertEquals(Collections.singletonList(rawOperatorStateHandle), 
subtaskState.getRawOperatorState());
+               
assertEquals(StateObjectCollection.singleton(managedKeyedStateHandle), 
subtaskState.getManagedKeyedState());
+               
assertEquals(StateObjectCollection.singleton(rawKeyedStateHandle), 
subtaskState.getRawKeyedState());
+               
assertEquals(StateObjectCollection.singleton(managedOperatorStateHandle), 
subtaskState.getManagedOperatorState());
+               
assertEquals(StateObjectCollection.singleton(rawOperatorStateHandle), 
subtaskState.getRawOperatorState());
 
                // check that the state handles have not been discarded
                verify(managedKeyedStateHandle, never()).discardState();
@@ -602,26 +605,15 @@ public class StreamTaskTest extends TestLogger {
 
                Environment mockEnvironment = spy(new MockEnvironment());
 
-               whenNew(OperatorSubtaskState.class).
-                       withArguments(
-                               anyCollectionOf(OperatorStateHandle.class),
-                               anyCollectionOf(OperatorStateHandle.class),
-                               anyCollectionOf(KeyedStateHandle.class),
-                               anyCollectionOf(KeyedStateHandle.class)).
-                       thenAnswer(new Answer<OperatorSubtaskState>() {
-                               @Override
-                       public OperatorSubtaskState answer(InvocationOnMock 
invocation) throws Throwable {
-                               createSubtask.trigger();
-                               completeSubtask.await();
-                               Object[] arguments = invocation.getArguments();
-                               return new OperatorSubtaskState(
-                                       (OperatorStateHandle) arguments[0],
-                                       (OperatorStateHandle) arguments[1],
-                                       (KeyedStateHandle) arguments[2],
-                                       (KeyedStateHandle) arguments[3]
-                               );
-                       }
-               });
+               whenNew(OperatorSnapshotFinalizer.class).
+                       withAnyArguments().
+                       thenAnswer((Answer<OperatorSnapshotFinalizer>) 
invocation -> {
+                                       createSubtask.trigger();
+                                       completeSubtask.await();
+                                       Object[] arguments = 
invocation.getArguments();
+                                       return new 
OperatorSnapshotFinalizer((OperatorSnapshotFutures) arguments[0]);
+                               }
+                       );
 
                StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
@@ -632,14 +624,14 @@ public class StreamTaskTest extends TestLogger {
 
                KeyedStateHandle managedKeyedStateHandle = 
mock(KeyedStateHandle.class);
                KeyedStateHandle rawKeyedStateHandle = 
mock(KeyedStateHandle.class);
-               OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStateHandle.class);
-               OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStateHandle.class);
+               OperatorStateHandle managedOperatorStateHandle = 
mock(OperatorStreamStateHandle.class);
+               OperatorStateHandle rawOperatorStateHandle = 
mock(OperatorStreamStateHandle.class);
 
                OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
-                       new DoneFuture<>(managedKeyedStateHandle),
-                       new DoneFuture<>(rawKeyedStateHandle),
-                       new DoneFuture<>(managedOperatorStateHandle),
-                       new DoneFuture<>(rawOperatorStateHandle));
+                       
DoneFuture.of(SnapshotResult.of(managedKeyedStateHandle)),
+                       DoneFuture.of(SnapshotResult.of(rawKeyedStateHandle)),
+                       
DoneFuture.of(SnapshotResult.of(managedOperatorStateHandle)),
+                       
DoneFuture.of(SnapshotResult.of(rawOperatorStateHandle)));
 
                when(streamOperator.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
 
@@ -722,7 +714,7 @@ public class StreamTaskTest extends TestLogger {
                TaskStateManager taskStateManager = new TaskStateManagerImpl(
                        new JobID(1L, 2L),
                        new ExecutionAttemptID(1L, 2L),
-                       mock(TaskLocalStateStore.class),
+                       mock(TaskLocalStateStoreImpl.class),
                        null,
                        checkpointResponder);
 
@@ -823,7 +815,13 @@ public class StreamTaskTest extends TestLogger {
        //  Test Utilities
        // 
------------------------------------------------------------------------
 
-       private static class NoOpStreamTask<T, OP extends StreamOperator<T>> 
extends StreamTask<T, OP> {
+       /**
+        * Operator that does nothing.
+        *
+        * @param <T>
+        * @param <OP>
+        */
+       public static class NoOpStreamTask<T, OP extends StreamOperator<T>> 
extends StreamTask<T, OP> {
 
                public NoOpStreamTask(Environment environment) {
                        super(environment, null);
@@ -898,9 +896,17 @@ public class StreamTaskTest extends TestLogger {
        }
 
        public static Task createTask(
+               Class<? extends AbstractInvokable> invokable,
+               StreamConfig taskConfig,
+               Configuration taskManagerConfig) throws Exception {
+               return createTask(invokable, taskConfig, taskManagerConfig, new 
TestTaskStateManager());
+       }
+
+       public static Task createTask(
                        Class<? extends AbstractInvokable> invokable,
                        StreamConfig taskConfig,
-                       Configuration taskManagerConfig) throws Exception {
+                       Configuration taskManagerConfig,
+                       TestTaskStateManager taskStateManager) throws Exception 
{
 
                BlobCacheService blobService =
                        new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
@@ -951,7 +957,7 @@ public class StreamTaskTest extends TestLogger {
                        mock(IOManager.class),
                        network,
                        mock(BroadcastVariableManager.class),
-                       new TestTaskStateManager(),
+                       taskStateManager,
                        mock(TaskManagerActions.class),
                        mock(InputSplitProvider.class),
                        mock(CheckpointResponder.class),
@@ -1021,22 +1027,16 @@ public class StreamTaskTest extends TestLogger {
        /**
         * Mocked state backend factory which returns mocks for the operator 
and keyed state backends.
         */
-       public static final class MockStateBackend implements 
StateBackendFactory<AbstractStateBackend> {
+       public static final class TestMemoryStateBackendFactory implements 
StateBackendFactory<AbstractStateBackend> {
                private static final long serialVersionUID = 1L;
 
                @Override
                public AbstractStateBackend createFromConfig(Configuration 
config) {
-                       return new MemoryStateBackend() {
-                               @Override
-                               public OperatorStateBackend 
createOperatorStateBackend(Environment env, String operatorIdentifier) throws 
Exception {
-                                       return 
spy(super.createOperatorStateBackend(env, operatorIdentifier));
-                               }
+                       return new 
TestSpyWrapperStateBackend(createInnerBackend(config));
+               }
 
-                               @Override
-                               public <K> AbstractKeyedStateBackend<K> 
createKeyedStateBackend(Environment env, JobID jobID, String 
operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, 
KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) {
-                                       return 
spy(super.createKeyedStateBackend(env, jobID, operatorIdentifier, 
keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry));
-                               }
-                       };
+               protected AbstractStateBackend createInnerBackend(Configuration 
config) {
+                       return new MemoryStateBackend();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index bcb833e..08032bd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -141,7 +141,7 @@ public class StreamTaskTestHarness<OUT> {
 
        public void setTaskStateSnapshot(long checkpointId, TaskStateSnapshot 
taskStateSnapshot) {
                taskStateManager.setReportedCheckpointId(checkpointId);
-               taskStateManager.setTaskStateSnapshotsByCheckpointId(
+               taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
                        Collections.singletonMap(checkpointId, 
taskStateSnapshot));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 6df33b7..3d3b28e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -81,9 +82,10 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Collections;
-import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 
@@ -303,7 +305,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                env.getExecutionConfig(),
                                true) {
                                @Override
-                               public RunnableFuture<OperatorStateHandle> 
snapshot(
+                               public 
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
                                        long checkpointId,
                                        long timestamp,
                                        CheckpointStreamFactory streamFactory,
@@ -332,17 +334,14 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                env.getExecutionConfig(),
                                true) {
                                @Override
-                               public RunnableFuture<OperatorStateHandle> 
snapshot(
+                               public 
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
                                        long checkpointId,
                                        long timestamp,
                                        CheckpointStreamFactory streamFactory,
                                        CheckpointOptions checkpointOptions) 
throws Exception {
 
-                                       return new FutureTask<>(new 
Callable<OperatorStateHandle>() {
-                                               @Override
-                                               public OperatorStateHandle 
call() throws Exception {
-                                                       throw new 
Exception("Async part snapshot exception.");
-                                               }
+                                       return new FutureTask<>(() -> {
+                                               throw new Exception("Async part 
snapshot exception.");
                                        });
                                }
                        };
@@ -364,6 +363,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                private final Object lock = new Object();
                private volatile boolean closed;
 
+               @Nullable
                @Override
                public StreamStateHandle closeAndGetHandle() throws IOException 
{
                        throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
new file mode 100644
index 0000000..914326b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * This class wraps an {@link AbstractStateBackend} and enriches all the 
created objects as spies.
+ */
+public class TestSpyWrapperStateBackend extends AbstractStateBackend {
+
+               private final AbstractStateBackend delegate;
+
+               public TestSpyWrapperStateBackend(AbstractStateBackend 
delegate) {
+                       this.delegate = Preconditions.checkNotNull(delegate);
+               }
+
+               @Override
+               public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange,
+                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
+                       return spy(delegate.createKeyedStateBackend(
+                               env,
+                               jobID,
+                               operatorIdentifier,
+                               keySerializer,
+                               numberOfKeyGroups,
+                               keyGroupRange,
+                               kvStateRegistry));
+               }
+
+               @Override
+               public OperatorStateBackend createOperatorStateBackend(
+                       Environment env, String operatorIdentifier) throws 
Exception {
+                       return spy(delegate.createOperatorStateBackend(env, 
operatorIdentifier));
+               }
+
+       @Override
+       public CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer) throws IOException {
+               return spy(delegate.resolveCheckpoint(externalPointer));
+       }
+
+       @Override
+       public CheckpointStorage createCheckpointStorage(JobID jobId) throws 
IOException {
+               return spy(delegate.createCheckpointStorage(jobId));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 28ad930..ed2da18 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import 
org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -47,6 +48,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -60,7 +62,6 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
@@ -320,11 +321,27 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         * subtask.
         */
        public void initializeState(OperatorSubtaskState operatorStateHandles) 
throws Exception {
+               initializeState(operatorStateHandles, null);
+       }
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState()}.
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
+        * if it was not called before.
+        *
+        * @param jmOperatorStateHandles the primary state (owned by JM)
+        * @param tmOperatorStateHandles the (optional) local state (owned by 
TM) or null.
+        * @throws Exception
+        */
+       public void initializeState(
+               OperatorSubtaskState jmOperatorStateHandles,
+               OperatorSubtaskState tmOperatorStateHandles) throws Exception {
+
                if (!setupCalled) {
                        setup();
                }
 
-               if (operatorStateHandles != null) {
+               if (jmOperatorStateHandles != null) {
                        int numKeyGroups = 
getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
                        int numSubtasks = 
getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
                        int subtaskIndex = 
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
@@ -332,53 +349,54 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                        // create a new OperatorStateHandles that only contains 
the state for our key-groups
 
                        List<KeyGroupRange> keyGroupPartitions = 
StateAssignmentOperation.createKeyGroupPartitions(
-                                       numKeyGroups,
-                                       numSubtasks);
+                               numKeyGroups,
+                               numSubtasks);
 
-                       KeyGroupRange localKeyGroupRange =
-                                       keyGroupPartitions.get(subtaskIndex);
+                       KeyGroupRange localKeyGroupRange = 
keyGroupPartitions.get(subtaskIndex);
 
-                       List<KeyedStateHandle> localManagedKeyGroupState = null;
-                       if (operatorStateHandles.getManagedKeyedState() != 
null) {
-                               localManagedKeyGroupState = 
StateAssignmentOperation.getKeyedStateHandles(
-                                               
operatorStateHandles.getManagedKeyedState(),
-                                               localKeyGroupRange);
-                       }
+                       List<KeyedStateHandle> localManagedKeyGroupState = 
StateAssignmentOperation.getKeyedStateHandles(
+                               jmOperatorStateHandles.getManagedKeyedState(),
+                               localKeyGroupRange);
 
-                       List<KeyedStateHandle> localRawKeyGroupState = null;
-                       if (operatorStateHandles.getRawKeyedState() != null) {
-                               localRawKeyGroupState = 
StateAssignmentOperation.getKeyedStateHandles(
-                                               
operatorStateHandles.getRawKeyedState(),
-                                               localKeyGroupRange);
-                       }
+                       List<KeyedStateHandle> localRawKeyGroupState = 
StateAssignmentOperation.getKeyedStateHandles(
+                               jmOperatorStateHandles.getRawKeyedState(),
+                               localKeyGroupRange);
 
                        List<OperatorStateHandle> managedOperatorState = new 
ArrayList<>();
-                       if (operatorStateHandles.getManagedOperatorState() != 
null) {
-                               
managedOperatorState.addAll(operatorStateHandles.getManagedOperatorState());
-                       }
+
+                       
managedOperatorState.addAll(jmOperatorStateHandles.getManagedOperatorState());
+
                        Collection<OperatorStateHandle> 
localManagedOperatorState = operatorStateRepartitioner.repartitionState(
-                                       managedOperatorState,
-                                       numSubtasks).get(subtaskIndex);
+                               managedOperatorState,
+                               numSubtasks).get(subtaskIndex);
 
                        List<OperatorStateHandle> rawOperatorState = new 
ArrayList<>();
-                       if (operatorStateHandles.getRawOperatorState() != null) 
{
-                               
rawOperatorState.addAll(operatorStateHandles.getRawOperatorState());
-                       }
+
+                       
rawOperatorState.addAll(jmOperatorStateHandles.getRawOperatorState());
+
                        Collection<OperatorStateHandle> localRawOperatorState = 
operatorStateRepartitioner.repartitionState(
-                                       rawOperatorState,
-                                       numSubtasks).get(subtaskIndex);
+                               rawOperatorState,
+                               numSubtasks).get(subtaskIndex);
 
-                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(
-                               
nullToEmptyCollection(localManagedOperatorState),
-                               nullToEmptyCollection(localRawOperatorState),
-                               
nullToEmptyCollection(localManagedKeyGroupState),
-                               nullToEmptyCollection(localRawKeyGroupState));
+                       OperatorSubtaskState processedJmOpSubtaskState = new 
OperatorSubtaskState(
+                               new 
StateObjectCollection<>(nullToEmptyCollection(localManagedOperatorState)),
+                               new 
StateObjectCollection<>(nullToEmptyCollection(localRawOperatorState)),
+                               new 
StateObjectCollection<>(nullToEmptyCollection(localManagedKeyGroupState)),
+                               new 
StateObjectCollection<>(nullToEmptyCollection(localRawKeyGroupState)));
 
-                       TaskStateSnapshot taskStateSnapshot = new 
TaskStateSnapshot();
-                       
taskStateSnapshot.putSubtaskStateByOperatorID(operator.getOperatorID(), 
operatorSubtaskState);
+                       TaskStateSnapshot jmTaskStateSnapshot = new 
TaskStateSnapshot();
+                       
jmTaskStateSnapshot.putSubtaskStateByOperatorID(operator.getOperatorID(), 
processedJmOpSubtaskState);
 
                        taskStateManager.setReportedCheckpointId(0);
-                       
taskStateManager.setTaskStateSnapshotsByCheckpointId(Collections.singletonMap(0L,
 taskStateSnapshot));
+                       
taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
+                               Collections.singletonMap(0L, 
jmTaskStateSnapshot));
+
+                       if (tmOperatorStateHandles != null) {
+                               TaskStateSnapshot tmTaskStateSnapshot = new 
TaskStateSnapshot();
+                               
tmTaskStateSnapshot.putSubtaskStateByOperatorID(operator.getOperatorID(), 
tmOperatorStateHandles);
+                               
taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId(
+                                       Collections.singletonMap(0L, 
tmTaskStateSnapshot));
+                       }
                }
 
                operator.initializeState();
@@ -422,35 +440,24 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                List<KeyedStateHandle> mergedManagedKeyedState = new 
ArrayList<>(handles.length);
                List<KeyedStateHandle> mergedRawKeyedState = new 
ArrayList<>(handles.length);
 
-               for (OperatorSubtaskState handle: handles) {
+               for (OperatorSubtaskState handle : handles) {
 
                        Collection<OperatorStateHandle> managedOperatorState = 
handle.getManagedOperatorState();
                        Collection<OperatorStateHandle> rawOperatorState = 
handle.getRawOperatorState();
                        Collection<KeyedStateHandle> managedKeyedState = 
handle.getManagedKeyedState();
                        Collection<KeyedStateHandle> rawKeyedState = 
handle.getRawKeyedState();
 
-                       if (managedOperatorState != null) {
-                               
mergedManagedOperatorState.addAll(managedOperatorState);
-                       }
-
-                       if (rawOperatorState != null) {
-                               mergedRawOperatorState.addAll(rawOperatorState);
-                       }
-
-                       if (managedKeyedState != null) {
-                               
mergedManagedKeyedState.addAll(managedKeyedState);
-                       }
-
-                       if (rawKeyedState != null) {
-                               mergedRawKeyedState.addAll(rawKeyedState);
-                       }
+                       mergedManagedOperatorState.addAll(managedOperatorState);
+                       mergedRawOperatorState.addAll(rawOperatorState);
+                       mergedManagedKeyedState.addAll(managedKeyedState);
+                       mergedRawKeyedState.addAll(rawKeyedState);
                }
 
                return new OperatorSubtaskState(
-                       mergedManagedOperatorState,
-                       mergedRawOperatorState,
-                       mergedManagedKeyedState,
-                       mergedRawKeyedState);
+                       new StateObjectCollection<>(mergedManagedOperatorState),
+                       new StateObjectCollection<>(mergedRawOperatorState),
+                       new StateObjectCollection<>(mergedManagedKeyedState),
+                       new StateObjectCollection<>(mergedRawKeyedState));
        }
 
        /**
@@ -469,6 +476,13 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)}.
         */
        public OperatorSubtaskState snapshot(long checkpointId, long timestamp) 
throws Exception {
+               return snapshotWithLocalState(checkpointId, 
timestamp).getJobManagerOwnedState();
+       }
+
+       /**
+        * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointOptions)}.
+        */
+       public OperatorSnapshotFinalizer snapshotWithLocalState(long 
checkpointId, long timestamp) throws Exception {
 
                OperatorSnapshotFutures operatorStateResult = 
operator.snapshotState(
                        checkpointId,
@@ -476,17 +490,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                        CheckpointOptions.forCheckpointWithDefaultLocation(),
                        
checkpointStorage.resolveCheckpointStorageLocation(checkpointId, 
CheckpointStorageLocationReference.getDefault()));
 
-               KeyedStateHandle keyedManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
-               KeyedStateHandle keyedRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
-
-               OperatorStateHandle opManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
-               OperatorStateHandle opRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
-
-               return new OperatorSubtaskState(
-                       opManaged != null ? 
Collections.singletonList(opManaged) : Collections.emptyList(),
-                       opRaw != null ? Collections.singletonList(opRaw) : 
Collections.emptyList(),
-                       keyedManaged != null ? 
Collections.singletonList(keyedManaged) : Collections.emptyList(),
-                       keyedRaw != null ? Collections.singletonList(keyedRaw) 
: Collections.emptyList());
+               return new OperatorSnapshotFinalizer(operatorStateResult);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 8d37266..1b5113d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -50,7 +51,7 @@ public class OperatorSnapshotUtil {
 
                try (DataOutputStream dos = new DataOutputStream(out)) {
 
-                       // must be here for compatibility
+                       // required for backwards compatibility.
                        dos.writeInt(0);
 
                        // still required for compatibility
@@ -108,15 +109,16 @@ public class OperatorSnapshotUtil {
                FileInputStream in = new FileInputStream(path);
                try (DataInputStream dis = new DataInputStream(in)) {
 
-                       // ignored
+                       // required for backwards compatibility.
                        dis.readInt();
 
                        // still required for compatibility to consume the 
bytes.
                        SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
-                       List<OperatorStateHandle> rawOperatorState = new 
ArrayList<>();
+                       List<OperatorStateHandle> rawOperatorState = null;
                        int numRawOperatorStates = dis.readInt();
                        if (numRawOperatorStates >= 0) {
+                               rawOperatorState = new ArrayList<>();
                                for (int i = 0; i < numRawOperatorStates; i++) {
                                        OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
                                                dis);
@@ -124,9 +126,10 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       List<OperatorStateHandle> managedOperatorState = new 
ArrayList<>();
+                       List<OperatorStateHandle> managedOperatorState = null;
                        int numManagedOperatorStates = dis.readInt();
                        if (numManagedOperatorStates >= 0) {
+                               managedOperatorState = new ArrayList<>();
                                for (int i = 0; i < numManagedOperatorStates; 
i++) {
                                        OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
                                                dis);
@@ -134,9 +137,10 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       List<KeyedStateHandle> rawKeyedState = new 
ArrayList<>();
+                       List<KeyedStateHandle> rawKeyedState = null;
                        int numRawKeyedStates = dis.readInt();
                        if (numRawKeyedStates >= 0) {
+                               rawKeyedState = new ArrayList<>();
                                for (int i = 0; i < numRawKeyedStates; i++) {
                                        KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
                                                dis);
@@ -144,9 +148,10 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       List<KeyedStateHandle> managedKeyedState = new 
ArrayList<>();
+                       List<KeyedStateHandle> managedKeyedState = null;
                        int numManagedKeyedStates = dis.readInt();
                        if (numManagedKeyedStates >= 0) {
+                               managedKeyedState = new ArrayList<>();
                                for (int i = 0; i < numManagedKeyedStates; i++) 
{
                                        KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
                                                dis);
@@ -155,10 +160,10 @@ public class OperatorSnapshotUtil {
                        }
 
                        return new OperatorSubtaskState(
-                               managedOperatorState,
-                               rawOperatorState,
-                               managedKeyedState,
-                               rawKeyedState);
+                               new 
StateObjectCollection<>(managedOperatorState),
+                               new StateObjectCollection<>(rawOperatorState),
+                               new StateObjectCollection<>(managedKeyedState),
+                               new StateObjectCollection<>(rawKeyedState));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 53d329b..557c097 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -104,7 +104,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        public TestName name = new TestName();
 
        private StateBackendEnum stateBackendEnum;
-       private AbstractStateBackend stateBackend;
+       protected AbstractStateBackend stateBackend;
 
        AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum 
stateBackendEnum) {
                this.stateBackendEnum = stateBackendEnum;
@@ -128,23 +128,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        zkServer.start();
                }
 
-               TemporaryFolder temporaryFolder = new TemporaryFolder();
-               temporaryFolder.create();
-               final File haDir = temporaryFolder.newFolder();
-
-               Configuration config = new Configuration();
-               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
-               config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
-               // the default network buffers size (10% of heap max =~ 150MB) 
seems to much for this test case
-               config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
80L << 20); // 80 MB
-               config.setString(AkkaOptions.FRAMESIZE, 
String.valueOf(MAX_MEM_STATE_SIZE) + "b");
-
-               if (zkServer != null) {
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
-                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
-               }
+               Configuration config = createClusterConfig();
 
                // purposefully delay in the executor to tease out races
                final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
@@ -208,6 +192,27 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                }
        }
 
+       protected Configuration createClusterConfig() throws IOException {
+               TemporaryFolder temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+               final File haDir = temporaryFolder.newFolder();
+
+               Configuration config = new Configuration();
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
+               config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
+               // the default network buffers size (10% of heap max =~ 150MB) 
seems to much for this test case
+               config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
80L << 20); // 80 MB
+               config.setString(AkkaOptions.FRAMESIZE, 
String.valueOf(MAX_MEM_STATE_SIZE) + "b");
+
+               if (zkServer != null) {
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
+               }
+               return config;
+       }
+
        @After
        public void stopTestCluster() throws IOException {
                if (cluster != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
new file mode 100644
index 0000000..51b3b84
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode;
+import static 
org.apache.flink.runtime.state.LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+
+/**
+ * This test delegates to instances of {@link 
AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
+ * to use local recovery.
+ */
+public class LocalRecoveryITCase extends TestLogger {
+
+       @Rule
+       public TestName testName = new TestName();
+
+       @Test
+       public void testLocalRecoveryHeapBackendFileBased() throws Exception {
+               executeTest(
+                       FILE_ASYNC,
+                       ENABLE_FILE_BASED);
+       }
+
+       @Test
+       public void testLocalRecoveryRocksIncrementalFileBased() throws 
Exception {
+               executeTest(
+                       ROCKSDB_INCREMENTAL_ZK,
+                       ENABLE_FILE_BASED);
+       }
+
+       @Test
+       public void testLocalRecoveryRocksFullFileBased() throws Exception {
+               executeTest(
+                       ROCKSDB_FULLY_ASYNC,
+                       ENABLE_FILE_BASED);
+       }
+
+       private void executeTest(
+               StateBackendEnum backendEnum,
+               LocalRecoveryMode recoveryMode) throws Exception {
+
+               AbstractEventTimeWindowCheckpointingITCase windowChkITCase =
+                       new 
AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery(
+                               backendEnum,
+                               recoveryMode);
+
+               executeTest(windowChkITCase);
+       }
+
+       private void executeTest(AbstractEventTimeWindowCheckpointingITCase 
delegate) throws Exception {
+               delegate.name = testName;
+               delegate.tempFolder.create();
+               try {
+                       delegate.startTestCluster();
+                       delegate.testTumblingTimeWindow();
+                       delegate.stopTestCluster();
+
+                       delegate.startTestCluster();
+                       delegate.testSlidingTimeWindow();
+                       delegate.stopTestCluster();
+               } finally {
+                       delegate.tempFolder.delete();
+               }
+       }
+
+       private static class 
AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery
+               extends AbstractEventTimeWindowCheckpointingITCase {
+
+               private final LocalRecoveryMode recoveryMode;
+
+               AbstractEventTimeWindowCheckpointingITCaseWithLocalRecovery(
+                       StateBackendEnum stateBackendEnum,
+                       LocalRecoveryMode recoveryMode) {
+
+                       super(stateBackendEnum);
+                       this.recoveryMode = recoveryMode;
+               }
+
+               @Override
+               protected Configuration createClusterConfig() throws 
IOException {
+                       Configuration config = super.createClusterConfig();
+
+                       config.setString(
+                               CheckpointingOptions.LOCAL_RECOVERY,
+                               recoveryMode.toString());
+
+                       return config;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 95763bf..a23c679 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -550,7 +550,6 @@ public class RescalingITCase extends TestLogger {
                                if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
                                        break;
                                }
-                               System.out.println(savepointResponse);
                        }
 
                        assertTrue(savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess);

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index fba3b93..537f864 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
@@ -44,6 +45,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 
 /**
@@ -69,7 +71,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                testExternalizedCheckpoints(
                        checkpointDir,
                        null,
-                       new 
RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+                       createRocksDBStateBackend(checkpointDir, true),
+                       false);
        }
 
        @Test
@@ -78,7 +81,28 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                testExternalizedCheckpoints(
                        checkpointDir,
                        null,
-                       new 
RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+                       createRocksDBStateBackend(checkpointDir, false),
+                       false);
+       }
+
+       @Test
+       public void 
testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryStandalone() 
throws Exception {
+               final File checkpointDir = temporaryFolder.newFolder();
+               testExternalizedCheckpoints(
+                       checkpointDir,
+                       null,
+                       createRocksDBStateBackend(checkpointDir, true),
+                       true);
+       }
+
+       @Test
+       public void 
testExternalizedFullRocksDBCheckpointsWithLocalRecoveryStandalone() throws 
Exception {
+               final File checkpointDir = temporaryFolder.newFolder();
+               testExternalizedCheckpoints(
+                       checkpointDir,
+                       null,
+                       createRocksDBStateBackend(checkpointDir, false),
+                       true);
        }
 
        @Test
@@ -87,8 +111,18 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                testExternalizedCheckpoints(
                        checkpointDir,
                        null,
-                       new FsStateBackend(checkpointDir.toURI().toString(), 
true));
+                       createFsStateBackend(checkpointDir),
+                       false);
+       }
 
+       @Test
+       public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() 
throws Exception {
+               final File checkpointDir = temporaryFolder.newFolder();
+               testExternalizedCheckpoints(
+                       checkpointDir,
+                       null,
+                       createFsStateBackend(checkpointDir),
+                       true);
        }
 
        @Test
@@ -100,7 +134,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                        testExternalizedCheckpoints(
                                checkpointDir,
                                zkServer.getConnectString(),
-                               new 
RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+                               createRocksDBStateBackend(checkpointDir, true),
+                               false);
                } finally {
                        zkServer.stop();
                }
@@ -115,7 +150,40 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                        testExternalizedCheckpoints(
                                checkpointDir,
                                zkServer.getConnectString(),
-                               new 
RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+                               createRocksDBStateBackend(checkpointDir, false),
+                               false);
+               } finally {
+                       zkServer.stop();
+               }
+       }
+
+       @Test
+       public void 
testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper() 
throws Exception {
+               TestingServer zkServer = new TestingServer();
+               zkServer.start();
+               try {
+                       final File checkpointDir = temporaryFolder.newFolder();
+                       testExternalizedCheckpoints(
+                               checkpointDir,
+                               zkServer.getConnectString(),
+                               createRocksDBStateBackend(checkpointDir, true),
+                               true);
+               } finally {
+                       zkServer.stop();
+               }
+       }
+
+       @Test
+       public void 
testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper() throws 
Exception {
+               TestingServer zkServer = new TestingServer();
+               zkServer.start();
+               try {
+                       final File checkpointDir = temporaryFolder.newFolder();
+                       testExternalizedCheckpoints(
+                               checkpointDir,
+                               zkServer.getConnectString(),
+                               createRocksDBStateBackend(checkpointDir, false),
+                               true);
                } finally {
                        zkServer.stop();
                }
@@ -130,16 +198,45 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                        testExternalizedCheckpoints(
                                checkpointDir,
                                zkServer.getConnectString(),
-                               new 
FsStateBackend(checkpointDir.toURI().toString(), true));
+                               createFsStateBackend(checkpointDir),
+                               false);
                } finally {
                        zkServer.stop();
                }
        }
 
+       @Test
+       public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() 
throws Exception {
+               TestingServer zkServer = new TestingServer();
+               zkServer.start();
+               try {
+                       final File checkpointDir = temporaryFolder.newFolder();
+                       testExternalizedCheckpoints(
+                               checkpointDir,
+                               zkServer.getConnectString(),
+                               createFsStateBackend(checkpointDir),
+                               true);
+               } finally {
+                       zkServer.stop();
+               }
+       }
+
+       private FsStateBackend createFsStateBackend(File checkpointDir) throws 
IOException {
+               return new FsStateBackend(checkpointDir.toURI().toString(), 
true);
+       }
+
+       private RocksDBStateBackend createRocksDBStateBackend(
+               File checkpointDir,
+               boolean incrementalCheckpointing) throws IOException {
+
+               return new 
RocksDBStateBackend(checkpointDir.toURI().toString(), incrementalCheckpointing);
+       }
+
        private void testExternalizedCheckpoints(
                File checkpointDir,
                String zooKeeperQuorum,
-               StateBackend backend) throws Exception {
+               StateBackend backend,
+               boolean localRecovery) throws Exception {
 
                final Configuration config = new Configuration();
 
@@ -152,6 +249,12 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
                config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
 
+               if (localRecovery) {
+                       config.setString(
+                               CheckpointingOptions.LOCAL_RECOVERY,
+                               
LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED.toString());
+               }
+
                // ZooKeeper recovery mode?
                if (zooKeeperQuorum != null) {
                        final File haDir = temporaryFolder.newFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index d65c323..2968f13 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -64,7 +64,7 @@ public class ManualWindowSpeedITCase extends AbstractTestBase 
{
                String checkpoints = tempFolder.newFolder().toURI().toString();
                env.setStateBackend(new FsStateBackend(checkpoints));
 
-               env.addSource(new InfiniteTupleSource(10_000))
+               env.addSource(new InfiniteTupleSource(1_000))
                                .keyBy(0)
                                .timeWindow(Time.seconds(3))
                                .reduce(new ReduceFunction<Tuple2<String, 
Integer>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index aed6663..aaa96fb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -348,7 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                        boolean isKeyedState,
                        StateBackend stateBackend,
                        ClassLoader classLoader,
-                       OperatorSubtaskState operatorStateHandles,
+                       OperatorSubtaskState operatorSubtaskState,
                        Iterable<Long> input) throws Exception {
 
                try (final MockEnvironment environment = new MockEnvironment(
@@ -358,11 +358,11 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
                        256,
                        taskConfiguration,
                        executionConfig,
+                       new TestTaskStateManager(),
                        16,
                        1,
                        0,
-                       classLoader,
-                       new TestTaskStateManager())) {
+                       classLoader)) {
 
                        OneInputStreamOperatorTestHarness<Long, Long> harness = 
null;
                        try {
@@ -379,7 +379,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                                harness.setStateBackend(stateBackend);
 
                                harness.setup();
-                               harness.initializeState(operatorStateHandles);
+                               harness.initializeState(operatorSubtaskState);
                                harness.open();
 
                                long timestamp = 0L;

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index e33aa49..9dd0b28 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.security.SecurityUtils
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
@@ -40,6 +41,7 @@ import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
   * @param memoryManager MemoryManager which is responsible for Flink's 
managed memory allocation
   * @param ioManager IOManager responsible for I/O
   * @param network NetworkEnvironment for this actor
+  * @param taskManagerLocalStateStoresManager Task manager state store manager 
for this actor
   * @param numberOfSlots Number of slots for this TaskManager
   * @param highAvailabilityServices [[HighAvailabilityServices]] to create a 
leader retrieval
   *                                service for retrieving the leading 
JobManager
@@ -51,6 +53,7 @@ class TestingYarnTaskManager(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
+    taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup : TaskManagerMetricGroup)
@@ -61,6 +64,7 @@ class TestingYarnTaskManager(
     memoryManager,
     ioManager,
     network,
+    taskManagerLocalStateStoresManager,
     numberOfSlots,
     highAvailabilityServices,
     taskManagerMetricGroup)

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 08d24b4..d54ae9b 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.security.SecurityUtils
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
@@ -40,6 +41,7 @@ class YarnTaskManager(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
+    taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup: TaskManagerMetricGroup)
@@ -50,6 +52,7 @@ class YarnTaskManager(
     memoryManager,
     ioManager,
     network,
+    taskManagerLocalStateStoresManager,
     numberOfSlots,
     highAvailabilityServices,
     taskManagerMetricGroup) {

Reply via email to