http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index f8084ca..20f66c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -23,16 +23,20 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -45,43 +49,59 @@ public class TestTaskStateManager implements 
TaskStateManager {
        private JobID jobId;
        private ExecutionAttemptID executionAttemptID;
 
-       private final Map<Long, TaskStateSnapshot> 
taskStateSnapshotsByCheckpointId;
+       private final Map<Long, TaskStateSnapshot> 
jobManagerTaskStateSnapshotsByCheckpointId;
+       private final Map<Long, TaskStateSnapshot> 
taskManagerTaskStateSnapshotsByCheckpointId;
        private CheckpointResponder checkpointResponder;
        private OneShotLatch waitForReportLatch;
+       private LocalRecoveryConfig localRecoveryDirectoryProvider;
 
        public TestTaskStateManager() {
-               this(new JobID(), new ExecutionAttemptID(), new 
TestCheckpointResponder());
+               this(TestLocalRecoveryConfig.disabled());
+       }
+
+       public TestTaskStateManager(LocalRecoveryConfig localRecoveryConfig) {
+               this(
+                       new JobID(),
+                       new ExecutionAttemptID(),
+                       new TestCheckpointResponder(),
+                       localRecoveryConfig);
        }
 
        public TestTaskStateManager(
                JobID jobId,
                ExecutionAttemptID executionAttemptID) {
-
-               this(jobId, executionAttemptID, null);
+               this(jobId, executionAttemptID, null, 
TestLocalRecoveryConfig.disabled());
        }
 
        public TestTaskStateManager(
                JobID jobId,
                ExecutionAttemptID executionAttemptID,
-               CheckpointResponder checkpointResponder) {
+               CheckpointResponder checkpointResponder,
+               LocalRecoveryConfig localRecoveryConfig) {
                this.jobId = jobId;
                this.executionAttemptID = executionAttemptID;
                this.checkpointResponder = checkpointResponder;
-               this.taskStateSnapshotsByCheckpointId = new HashMap<>();
+               this.localRecoveryDirectoryProvider = localRecoveryConfig;
+               this.jobManagerTaskStateSnapshotsByCheckpointId = new 
HashMap<>();
+               this.taskManagerTaskStateSnapshotsByCheckpointId = new 
HashMap<>();
                this.reportedCheckpointId = -1L;
        }
 
        @Override
-       public void reportStateHandles(
+       public void reportTaskStateSnapshots(
                @Nonnull CheckpointMetaData checkpointMetaData,
                @Nonnull CheckpointMetrics checkpointMetrics,
-               @Nullable TaskStateSnapshot acknowledgedState) {
+               @Nullable TaskStateSnapshot acknowledgedState,
+               @Nullable TaskStateSnapshot localState) {
 
-               if (taskStateSnapshotsByCheckpointId != null) {
-                       taskStateSnapshotsByCheckpointId.put(
-                               checkpointMetaData.getCheckpointId(),
-                               acknowledgedState);
-               }
+
+               jobManagerTaskStateSnapshotsByCheckpointId.put(
+                       checkpointMetaData.getCheckpointId(),
+                       acknowledgedState);
+
+               taskManagerTaskStateSnapshotsByCheckpointId.put(
+                       checkpointMetaData.getCheckpointId(),
+                       localState);
 
                if (checkpointResponder != null) {
                        checkpointResponder.acknowledgeCheckpoint(
@@ -99,10 +119,48 @@ public class TestTaskStateManager implements 
TaskStateManager {
                }
        }
 
+       @Nonnull
        @Override
-       public OperatorSubtaskState operatorStates(OperatorID operatorID) {
-               TaskStateSnapshot taskStateSnapshot = 
getLastTaskStateSnapshot();
-               return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+       public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
+               TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
+               TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
+
+               if (jmTaskStateSnapshot == null) {
+
+                       return 
PrioritizedOperatorSubtaskState.emptyNotRestored();
+               } else {
+
+                       OperatorSubtaskState jmOpState = 
jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+
+                       if (jmOpState == null) {
+
+                               return 
PrioritizedOperatorSubtaskState.emptyNotRestored();
+                       } else {
+
+                               List<OperatorSubtaskState> tmStateCollection = 
Collections.emptyList();
+
+                               if (tmTaskStateSnapshot != null) {
+                                       OperatorSubtaskState tmOpState = 
tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+                                       if (tmOpState != null) {
+                                               tmStateCollection = 
Collections.singletonList(tmOpState);
+                                       }
+                               }
+                               PrioritizedOperatorSubtaskState.Builder builder 
=
+                                       new 
PrioritizedOperatorSubtaskState.Builder(jmOpState, tmStateCollection);
+                               return builder.build();
+                       }
+               }
+       }
+
+       @Nonnull
+       @Override
+       public LocalRecoveryConfig createLocalRecoveryConfig() {
+               return 
Preconditions.checkNotNull(localRecoveryDirectoryProvider,
+                       "Local state directory was never set for this test 
object!");
+       }
+
+       public void setLocalRecoveryConfig(LocalRecoveryConfig 
recoveryDirectoryProvider) {
+               this.localRecoveryDirectoryProvider = recoveryDirectoryProvider;
        }
 
        @Override
@@ -134,13 +192,24 @@ public class TestTaskStateManager implements 
TaskStateManager {
                this.checkpointResponder = checkpointResponder;
        }
 
-       public Map<Long, TaskStateSnapshot> 
getTaskStateSnapshotsByCheckpointId() {
-               return taskStateSnapshotsByCheckpointId;
+       public Map<Long, TaskStateSnapshot> 
getJobManagerTaskStateSnapshotsByCheckpointId() {
+               return jobManagerTaskStateSnapshotsByCheckpointId;
        }
 
-       public void setTaskStateSnapshotsByCheckpointId(Map<Long, 
TaskStateSnapshot> taskStateSnapshotsByCheckpointId) {
-               this.taskStateSnapshotsByCheckpointId.clear();
-               
this.taskStateSnapshotsByCheckpointId.putAll(taskStateSnapshotsByCheckpointId);
+       public void setJobManagerTaskStateSnapshotsByCheckpointId(
+               Map<Long, TaskStateSnapshot> 
jobManagerTaskStateSnapshotsByCheckpointId) {
+               this.jobManagerTaskStateSnapshotsByCheckpointId.clear();
+               
this.jobManagerTaskStateSnapshotsByCheckpointId.putAll(jobManagerTaskStateSnapshotsByCheckpointId);
+       }
+
+       public Map<Long, TaskStateSnapshot> 
getTaskManagerTaskStateSnapshotsByCheckpointId() {
+               return taskManagerTaskStateSnapshotsByCheckpointId;
+       }
+
+       public void setTaskManagerTaskStateSnapshotsByCheckpointId(
+               Map<Long, TaskStateSnapshot> 
taskManagerTaskStateSnapshotsByCheckpointId) {
+               this.taskManagerTaskStateSnapshotsByCheckpointId.clear();
+               
this.taskManagerTaskStateSnapshotsByCheckpointId.putAll(taskManagerTaskStateSnapshotsByCheckpointId);
        }
 
        public long getReportedCheckpointId() {
@@ -151,9 +220,15 @@ public class TestTaskStateManager implements 
TaskStateManager {
                this.reportedCheckpointId = reportedCheckpointId;
        }
 
-       public TaskStateSnapshot getLastTaskStateSnapshot() {
-               return taskStateSnapshotsByCheckpointId != null ?
-                       
taskStateSnapshotsByCheckpointId.get(reportedCheckpointId)
+       public TaskStateSnapshot getLastJobManagerTaskStateSnapshot() {
+               return jobManagerTaskStateSnapshotsByCheckpointId != null ?
+                       
jobManagerTaskStateSnapshotsByCheckpointId.get(reportedCheckpointId)
+                       : null;
+       }
+
+       public TaskStateSnapshot getLastTaskManagerTaskStateSnapshot() {
+               return taskManagerTaskStateSnapshotsByCheckpointId != null ?
+                       
taskManagerTaskStateSnapshotsByCheckpointId.get(reportedCheckpointId)
                        : null;
        }
 
@@ -181,6 +256,6 @@ public class TestTaskStateManager implements 
TaskStateManager {
                }
 
                setReportedCheckpointId(latestId);
-               
setTaskStateSnapshotsByCheckpointId(taskStateSnapshotsByCheckpointId);
+               
setJobManagerTaskStateSnapshotsByCheckpointId(taskStateSnapshotsByCheckpointId);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java
new file mode 100644
index 0000000..6b526f4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractCheckpointStateOutputStreamTestBase.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Abstract base class for tests against checkpointing streams.
+ */
+public abstract class AbstractCheckpointStateOutputStreamTestBase extends 
TestLogger {
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Validates that even empty streams create a file and a file state 
handle.
+        */
+       @Test
+       public void testEmptyState() throws Exception {
+               final FileSystem fs = FileSystem.getLocalFileSystem();
+               final Path folder = baseFolder();
+               final String fileName = "myFileName";
+               final Path filePath = new Path(folder, fileName);
+
+               final FileStateHandle handle;
+               try (FSDataOutputStream stream = createTestStream(fs, folder, 
fileName)) {
+                       handle = closeAndGetResult(stream);
+               }
+
+               // must have created a handle
+               assertNotNull(handle);
+               assertEquals(filePath, handle.getFilePath());
+
+               // the pointer path should exist as a directory
+               assertTrue(fs.exists(handle.getFilePath()));
+               assertFalse(fs.getFileStatus(filePath).isDir());
+
+               // the contents should be empty
+               try (FSDataInputStream in = handle.openInputStream()) {
+                       assertEquals(-1, in.read());
+               }
+       }
+
+       /**
+        * Simple write and read test
+        */
+       @Test
+       public void testWriteAndRead() throws Exception {
+               final FileSystem fs = FileSystem.getLocalFileSystem();
+               final Path folder = baseFolder();
+               final String fileName = "fooBarName";
+
+               final Random rnd = new Random();
+               final byte[] data = new byte[1694523];
+
+               // write the data (mixed single byte writes and array writes)
+               final FileStateHandle handle;
+               try (FSDataOutputStream stream = createTestStream(fs, folder, 
fileName)) {
+                       for (int i = 0; i < data.length; ) {
+                               if (rnd.nextBoolean()) {
+                                       stream.write(data[i++]);
+                               } else {
+                                       int len = 
rnd.nextInt(Math.min(data.length - i, 32));
+                                       stream.write(data, i, len);
+                                       i += len;
+                               }
+                       }
+                       handle = closeAndGetResult(stream);
+               }
+
+               // (1) stream from handle must hold the contents
+               try (FSDataInputStream in = handle.openInputStream()) {
+                       byte[] buffer = new byte[data.length];
+                       readFully(in, buffer);
+                       assertArrayEquals(data, buffer);
+               }
+
+               // (2) the pointer must point to a file with that contents
+               try (FSDataInputStream in = fs.open(handle.getFilePath())) {
+                       byte[] buffer = new byte[data.length];
+                       readFully(in, buffer);
+                       assertArrayEquals(data, buffer);
+               }
+       }
+
+       /**
+        * Tests that the underlying stream file is deleted upon calling close.
+        */
+       @Test
+       public void testCleanupWhenClosingStream() throws IOException {
+               final FileSystem fs = FileSystem.getLocalFileSystem();
+               final Path folder = new Path(tmp.newFolder().toURI());
+               final String fileName = "nonCreativeTestFileName";
+               final Path path = new Path(folder, fileName);
+
+               // write some test data and close the stream
+               try (FSDataOutputStream stream = createTestStream(fs, folder, 
fileName)) {
+                       Random rnd = new Random();
+                       for (int i = 0; i < rnd.nextInt(1000); i++) {
+                               stream.write(rnd.nextInt(100));
+                       }
+                       assertTrue(fs.exists(path));
+               }
+
+               assertFalse(fs.exists(path));
+       }
+
+       /**
+        * Tests that the underlying stream file is deleted if the 
closeAndGetHandle method fails.
+        */
+       @Test
+       public void testCleanupWhenFailingCloseAndGetHandle() throws 
IOException {
+               final Path folder = new Path(tmp.newFolder().toURI());
+               final String fileName = "test_name";
+               final Path filePath = new Path(folder, fileName);
+
+               final FileSystem fs = spy(new TestFs((path) -> new 
FailingCloseStream(new File(path.getPath()))));
+
+               FSDataOutputStream stream = createTestStream(fs, folder, 
fileName);
+               stream.write(new byte[] {1, 2, 3, 4, 5});
+
+               try {
+                       closeAndGetResult(stream);
+                       fail("Expected IOException");
+               }
+               catch (IOException ignored) {
+                       // expected exception
+               }
+
+               verify(fs).delete(filePath, false);
+       }
+
+       /**
+        * This test validates that a close operation can happen even while a 
'closeAndGetHandle()'
+        * call is in progress.
+        * <p>
+        * <p>That behavior is essential for fast cancellation (concurrent 
cleanup).
+        */
+       @Test
+       public void testCloseDoesNotLock() throws Exception {
+               final Path folder = new Path(tmp.newFolder().toURI());
+               final String fileName = "this-is-ignored-anyways.file";
+
+               final FileSystem fileSystem = spy(new TestFs((path) -> new 
BlockerStream()));
+
+               final FSDataOutputStream checkpointStream =
+                       createTestStream(fileSystem, folder, fileName);
+
+               final OneShotLatch sync = new OneShotLatch();
+
+               final CheckedThread thread = new CheckedThread() {
+
+                       @Override
+                       public void go() throws Exception {
+                               sync.trigger();
+                               // that call should now block, because it 
accesses the position
+                               closeAndGetResult(checkpointStream);
+                       }
+               };
+               thread.start();
+
+               sync.await();
+               checkpointStream.close();
+
+               // the thread may or may not fail, that depends on the thread 
race
+               // it is not important for this test, important is that the 
thread does not freeze/lock up
+               try {
+                       thread.sync();
+               } catch (IOException ignored) {}
+       }
+
+       /**
+        * Creates a new test stream instance.
+        */
+       protected abstract FSDataOutputStream createTestStream(
+               FileSystem fs,
+               Path dir,
+               String fileName) throws IOException;
+
+       /**
+        * Closes the stream successfully and returns a FileStateHandle to the 
result.
+        */
+       protected abstract FileStateHandle closeAndGetResult(FSDataOutputStream 
stream) throws IOException;
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private Path baseFolder() throws Exception {
+               return new Path(new File(tmp.newFolder(), 
UUID.randomUUID().toString()).toURI());
+       }
+
+       private static void readFully(InputStream in, byte[] buffer) throws 
IOException {
+               int pos = 0;
+               int remaining = buffer.length;
+
+               while (remaining > 0) {
+                       int read = in.read(buffer, pos, remaining);
+                       if (read == -1) {
+                               throw new EOFException();
+                       }
+
+                       pos += read;
+                       remaining -= read;
+               }
+       }
+
+       private static class BlockerStream extends FSDataOutputStream {
+
+               private final OneShotLatch blocker = new OneShotLatch();
+
+               @Override
+               public long getPos() throws IOException {
+                       block();
+                       return 0L;
+               }
+
+               @Override
+               public void write(int b) throws IOException {
+                       block();
+               }
+
+               @Override
+               public void flush() throws IOException {
+                       block();
+               }
+
+               @Override
+               public void sync() throws IOException {
+                       block();
+               }
+
+               @Override
+               public void close() throws IOException {
+                       blocker.trigger();
+               }
+
+               private void block() throws IOException {
+                       try {
+                               blocker.await();
+                       } catch (InterruptedException e) {
+                               throw new IOException("interrupted");
+                       }
+                       throw new IOException("closed");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class FailingCloseStream extends LocalDataOutputStream {
+
+               FailingCloseStream(File file) throws IOException {
+                       super(file);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       throw new IOException();
+               }
+       }
+
+       private static class TestFs extends LocalFileSystem {
+
+               private final FunctionWithException<Path, FSDataOutputStream, 
IOException> streamFactory;
+
+               TestFs(FunctionWithException<Path, FSDataOutputStream, 
IOException> streamFactory) {
+                       this.streamFactory = streamFactory;
+               }
+
+               @Override
+               public FSDataOutputStream create(Path filePath, WriteMode 
overwrite) throws IOException {
+                       return streamFactory.apply(filePath);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java
new file mode 100644
index 0000000..9ee59d2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStreamTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Tests for the {@link FileBasedStateOutputStream}.
+ */
+public class FileBasedStateOutputStreamTest extends 
AbstractCheckpointStateOutputStreamTestBase {
+
+       @Override
+       protected FSDataOutputStream createTestStream(FileSystem fs, Path dir, 
String fileName) throws IOException {
+               return new FileBasedStateOutputStream(fs, new Path(dir, 
fileName));
+       }
+
+       @Override
+       protected FileStateHandle closeAndGetResult(FSDataOutputStream stream) 
throws IOException {
+               return ((FileBasedStateOutputStream) 
stream).closeAndGetHandle();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java
index 5a15b4b..4803e93 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStreamTest.java
@@ -18,291 +18,25 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.local.LocalDataOutputStream;
-import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.util.function.FunctionWithException;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.EOFException;
-import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.Random;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the {@link FsCheckpointMetadataOutputStream}.
  */
-public class FsCheckpointMetadataOutputStreamTest {
-
-       @Rule
-       public final TemporaryFolder tmp = new TemporaryFolder();
-
-       // 
------------------------------------------------------------------------
-       //  Tests
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Validates that even empty streams create a file and a file state 
handle.
-        */
-       @Test
-       public void testEmptyState() throws Exception {
-               final FileSystem fs = FileSystem.getLocalFileSystem();
-
-               final Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
-               final Path metadataPath = new Path(checkpointDir, "myFileName");
-
-               final FsCompletedCheckpointStorageLocation location;
-               try (FsCheckpointMetadataOutputStream stream = new 
FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir)) {
-                       location = stream.closeAndFinalizeCheckpoint();
-               }
-
-               // must have created a handle
-               assertNotNull(location);
-               assertNotNull(location.getMetadataHandle());
-               assertEquals(metadataPath, 
location.getMetadataHandle().getFilePath());
-
-               // the pointer path should exist as a file
-               assertTrue(fs.exists(metadataPath));
-               assertFalse(fs.getFileStatus(metadataPath).isDir());
-
-               // the contents should be empty
-               try (FSDataInputStream in = 
location.getMetadataHandle().openInputStream()) {
-                       assertEquals(-1, in.read());
-               }
-       }
-
-       /**
-        * Simple write and read test.
-        */
-       @Test
-       public void testWriteAndRead() throws Exception {
-               final FileSystem fs = FileSystem.getLocalFileSystem();
-
-               final Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
-               final Path metadataPath = new Path(checkpointDir, "fooBarName");
-
-               final Random rnd = new Random();
-               final byte[] data = new byte[1694523];
-
-               // write the data (mixed single byte writes and array writes)
-               final FsCompletedCheckpointStorageLocation completed;
-               try (FsCheckpointMetadataOutputStream stream = new 
FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir)) {
-                       for (int i = 0; i < data.length;) {
-                               if (rnd.nextBoolean()) {
-                                       stream.write(data[i++]);
-                               }
-                               else {
-                                       int len = 
rnd.nextInt(Math.min(data.length - i, 32));
-                                       stream.write(data, i, len);
-                                       i += len;
-                               }
-                       }
-                       completed = stream.closeAndFinalizeCheckpoint();
-               }
-
-               // (1) stream from handle must hold the contents
-               try (FSDataInputStream in = 
completed.getMetadataHandle().openInputStream()) {
-                       byte[] buffer = new byte[data.length];
-                       readFully(in, buffer);
-                       assertArrayEquals(data, buffer);
-               }
-
-               // (2) the pointer must point to a file with that contents
-               try (FSDataInputStream in = 
fs.open(completed.getMetadataHandle().getFilePath())) {
-                       byte[] buffer = new byte[data.length];
-                       readFully(in, buffer);
-                       assertArrayEquals(data, buffer);
-               }
-       }
-
-       /**
-        * Tests that the underlying stream file is deleted upon calling close.
-        */
-       @Test
-       public void testCleanupWhenClosingStream() throws IOException {
-               final FileSystem fs = FileSystem.getLocalFileSystem();
-
-               final Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
-               final Path metadataPath = new Path(checkpointDir, 
"nonCreativeTestFileName");
-
-               // write some test data and close the stream
-               try (FsCheckpointMetadataOutputStream stream = new 
FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir)) {
-                       Random rnd = new Random();
-                       for (int i = 0; i < rnd.nextInt(1000); i++) {
-                               stream.write(rnd.nextInt(100));
-                       }
-                       assertTrue(fs.exists(metadataPath));
-               }
-
-               assertFalse(fs.exists(metadataPath));
-               assertTrue(fs.exists(checkpointDir));
-       }
-
-       /**
-        * Tests that the underlying stream file is deleted if the 
closeAndGetHandle method fails.
-        */
-       @Test
-       public void testCleanupWhenFailingCloseAndGetHandle() throws 
IOException {
-               final Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
-               final Path metadataPath = new Path(checkpointDir, "test_name");
-
-               final FileSystem fs = spy(new TestFs((path) -> new 
FailingCloseStream(new File(path.getPath()))));
-
-               FsCheckpointMetadataOutputStream stream = new 
FsCheckpointMetadataOutputStream(fs, metadataPath, checkpointDir);
-               stream.write(new byte[] {1, 2, 3, 4, 5});
-
-               try {
-                       stream.closeAndFinalizeCheckpoint();
-                       fail("Expected IOException");
-               }
-               catch (IOException ignored) {
-                       // expected exception
-               }
-
-               verify(fs).delete(metadataPath, false);
-       }
+public class FsCheckpointMetadataOutputStreamTest extends 
AbstractCheckpointStateOutputStreamTestBase {
 
-       /**
-        * This test validates that a close operation can happen even while a 
'closeAndGetHandle()'
-        * call is in progress.
-        *
-        * <p>That behavior is essential for fast cancellation (concurrent 
cleanup).
-        */
-       @Test
-       public void testCloseDoesNotLock() throws Exception {
-               final Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
-               final Path metadataPath = new Path(checkpointDir, 
"this-is-ignored-anyways.file");
-
-               final FileSystem fileSystem = spy(new TestFs((path) -> new 
BlockerStream()));
-
-               final FsCheckpointMetadataOutputStream checkpointStream =
-                               new 
FsCheckpointMetadataOutputStream(fileSystem, metadataPath, checkpointDir);
-
-               final OneShotLatch sync = new OneShotLatch();
-
-               final CheckedThread thread = new CheckedThread() {
-
-                       @Override
-                       public void go() throws Exception {
-                               sync.trigger();
-                               // that call should now block, because it 
accesses the position
-                               checkpointStream.closeAndFinalizeCheckpoint();
-                       }
-               };
-               thread.start();
-
-               sync.await();
-               checkpointStream.close();
-
-               // the thread may or may not fail, that depends on the thread 
race
-               // it is not important for this test, important is that the 
thread does not freeze/lock up
-               try {
-                       thread.sync();
-               } catch (IOException ignored) {}
+       @Override
+       protected FSDataOutputStream createTestStream(FileSystem fs, Path dir, 
String fileName) throws IOException {
+               Path fullPath = new Path(dir, fileName);
+               return new FsCheckpointMetadataOutputStream(fs, fullPath, dir);
        }
 
-       // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       private static void readFully(InputStream in, byte[] buffer) throws 
IOException {
-               int pos = 0;
-               int remaining = buffer.length;
-
-               while (remaining > 0) {
-                       int read = in.read(buffer, pos, remaining);
-                       if (read == -1) {
-                               throw new EOFException();
-                       }
-
-                       pos += read;
-                       remaining -= read;
-               }
-       }
-
-       private static class BlockerStream extends FSDataOutputStream {
-
-               private final OneShotLatch blocker = new OneShotLatch();
-
-               @Override
-               public long getPos() throws IOException {
-                       block();
-                       return 0L;
-               }
-
-               @Override
-               public void write(int b) throws IOException {
-                       block();
-               }
-
-               @Override
-               public void flush() throws IOException {
-                       block();
-               }
-
-               @Override
-               public void sync() throws IOException {
-                       block();
-               }
-
-               @Override
-               public void close() throws IOException {
-                       blocker.trigger();
-               }
-
-               private void block() throws IOException {
-                       try {
-                               blocker.await();
-                       } catch (InterruptedException e) {
-                               throw new IOException("interrupted");
-                       }
-                       throw new IOException("closed");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static class FailingCloseStream extends LocalDataOutputStream {
-
-               FailingCloseStream(File file) throws IOException {
-                       super(file);
-               }
-
-               @Override
-               public void close() throws IOException {
-                       throw new IOException();
-               }
-       }
-
-       private static class TestFs extends LocalFileSystem {
-
-               private final FunctionWithException<Path, FSDataOutputStream, 
IOException> streamFactory;
-
-               TestFs(FunctionWithException<Path, FSDataOutputStream, 
IOException> streamFactory) {
-                       this.streamFactory = streamFactory;
-               }
-
-               @Override
-               public FSDataOutputStream create(Path filePath, WriteMode 
overwrite) throws IOException {
-                       return streamFactory.apply(filePath);
-               }
+       @Override
+       protected FileStateHandle closeAndGetResult(FSDataOutputStream stream) 
throws IOException {
+               return ((FsCheckpointMetadataOutputStream) 
stream).closeAndFinalizeCheckpoint().getMetadataHandle();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 3ac3b6b..d83aa9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -21,11 +21,12 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.junit.Test;
 
 import java.io.BufferedInputStream;
@@ -64,7 +65,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest 
extends HeapStateBackend
                        try (BufferedInputStream bis = new 
BufferedInputStream((new FileInputStream(resource.getFile())))) {
                                stateHandle = 
InstantiationUtil.deserializeObject(bis, 
Thread.currentThread().getContextClassLoader());
                        }
-                       
keyedBackend.restore(Collections.<KeyedStateHandle>singleton(stateHandle));
+                       
keyedBackend.restore(StateObjectCollection.singleton(stateHandle));
                        final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
                        stateDescr.initializeSerializerUnlessSet(new 
ExecutionConfig());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index b10c2c0..bf428dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -54,6 +56,7 @@ public abstract class HeapStateBackendTestBase {
                        16,
                        new KeyGroupRange(0, 15),
                        async,
-                       new ExecutionConfig());
+                       new ExecutionConfig(),
+                       TestLocalRecoveryConfig.disabled());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index b40f179..d958fcc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -68,7 +68,7 @@ public class BackendForTestStream extends MemoryStateBackend {
        // 
------------------------------------------------------------------------
 
        public interface StreamFactory
-                       extends 
SupplierWithException<CheckpointStateOutputStream, Exception>, 
java.io.Serializable {}
+                       extends 
SupplierWithException<CheckpointStateOutputStream, IOException>, 
java.io.Serializable {}
 
        // 
------------------------------------------------------------------------
 
@@ -119,7 +119,7 @@ public class BackendForTestStream extends 
MemoryStateBackend {
                }
 
                @Override
-               public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
Exception {
+               public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
IOException {
                        return streamFactory.get();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index bc6f238..8dcd642 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.testutils.category.OldAndFlip6;
 import org.apache.flink.util.TestLogger;
@@ -100,6 +101,8 @@ public class NetworkBufferCalculationTest extends 
TestLogger {
                return new TaskManagerServicesConfiguration(
                        InetAddress.getLoopbackAddress(),
                        new String[] {},
+                       new String[] {},
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
                        networkConfig,
                        QueryableStateConfiguration.disabled(),
                        1,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 8f4ec5d..d693f47 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -50,6 +50,8 @@ import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -64,6 +66,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
+import java.io.File;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
@@ -131,6 +134,14 @@ public class TaskExecutorITCase extends TestLogger {
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
+               final File[] taskExecutorLocalStateRootDirs =
+                       new File[]{new 
File(System.getProperty("java.io.tmpdir"), "localRecovery")};
+
+               final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       taskExecutorLocalStateRootDirs,
+                       rpcService.getExecutor());
+
                ResourceManager<ResourceID> resourceManager = new 
StandaloneResourceManager(
                        rpcService,
                        FlinkResourceManager.RESOURCE_MANAGER_NAME,
@@ -147,6 +158,7 @@ public class TaskExecutorITCase extends TestLogger {
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(taskStateManager)
                        .build();
 
                TaskExecutor taskExecutor = new TaskExecutor(

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d7a1860..e972feb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -74,6 +75,8 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -95,6 +98,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Matchers;
@@ -103,6 +107,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -137,6 +142,9 @@ import static org.mockito.Mockito.when;
 @Category(Flip6.class)
 public class TaskExecutorTest extends TestLogger {
 
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
        private static final Time timeout = Time.milliseconds(10000L);
 
        private TestingRpcService rpc;
@@ -227,10 +235,16 @@ public class TaskExecutorTest extends TestLogger {
                final SimpleJobMasterGateway jobMasterGateway = new 
SimpleJobMasterGateway(
                        CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId)));
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobLeaderService(jobLeaderService)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                final TaskExecutor taskManager = new TaskExecutor(
@@ -310,9 +324,15 @@ public class TaskExecutorTest extends TestLogger {
 
                HeartbeatServices heartbeatServices = new 
HeartbeatServices(heartbeatInterval, heartbeatTimeout);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                final TaskExecutor taskManager = new TaskExecutor(
@@ -439,9 +459,15 @@ public class TaskExecutorTest extends TestLogger {
                        }
                );
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                final TaskExecutor taskManager = new TaskExecutor(
@@ -518,9 +544,15 @@ public class TaskExecutorTest extends TestLogger {
                final SlotReport slotReport = new SlotReport();
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                TaskExecutor taskManager = new TaskExecutor(
@@ -575,9 +607,15 @@ public class TaskExecutorTest extends TestLogger {
                final SlotReport slotReport = new SlotReport();
                
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                TaskExecutor taskManager = new TaskExecutor(
@@ -690,10 +728,16 @@ public class TaskExecutorTest extends TestLogger {
                when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), 
eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setNetworkEnvironment(networkEnvironment)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobManagerTable(jobManagerTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                TaskExecutor taskManager = new TaskExecutor(
@@ -787,11 +831,17 @@ public class TaskExecutorTest extends TestLogger {
                final SlotID slotId = new 
SlotID(taskManagerLocation.getResourceID(), 0);
                final SlotOffer slotOffer = new SlotOffer(allocationId, 0, 
ResourceProfile.UNKNOWN);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobManagerTable(jobManagerTable)
                        .setJobLeaderService(jobLeaderService)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                TaskExecutor taskManager = new TaskExecutor(
@@ -891,11 +941,17 @@ public class TaskExecutorTest extends TestLogger {
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
                rpc.registerGateway(jobManagerAddress, jobMasterGateway);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobManagerTable(jobManagerTable)
                        .setJobLeaderService(jobLeaderService)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                TaskExecutor taskManager = new TaskExecutor(
@@ -1017,12 +1073,18 @@ public class TaskExecutorTest extends TestLogger {
 
                final NetworkEnvironment networkMock = 
mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setNetworkEnvironment(networkMock)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobLeaderService(jobLeaderService)
                        .setJobManagerTable(jobManagerTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                final TaskExecutor taskManager = new TaskExecutor(
@@ -1130,10 +1192,16 @@ public class TaskExecutorTest extends TestLogger {
                final JMTMRegistrationSuccess registrationMessage = new 
JMTMRegistrationSuccess(ResourceID.generate());
                final JobManagerTable jobManagerTableMock = spy(new 
JobManagerTable());
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setJobManagerTable(jobManagerTableMock)
                        .setJobLeaderService(jobLeaderService)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                final TaskExecutor taskExecutor = new TaskExecutor(
@@ -1198,9 +1266,15 @@ public class TaskExecutorTest extends TestLogger {
 
                rpc.registerGateway(rmAddress, rmGateway);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                final TaskExecutor taskExecutor = new TaskExecutor(
@@ -1248,9 +1322,15 @@ public class TaskExecutorTest extends TestLogger {
                        Collections.singleton(ResourceProfile.UNKNOWN),
                        timerService);
 
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(localStateStoresManager)
                        .build();
 
                final TaskExecutor taskExecutor = new TaskExecutor(

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 2bd39f8..3af9a46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -63,8 +63,7 @@ public class TaskManagerServicesBuilder {
                taskSlotTable = mock(TaskSlotTable.class);
                jobManagerTable = new JobManagerTable();
                jobLeaderService = new JobLeaderService(taskManagerLocation);
-               taskStateManager = new TaskExecutorLocalStateStoresManager();
-
+               taskStateManager = 
mock(TaskExecutorLocalStateStoresManager.class);
        }
 
        public TaskManagerServicesBuilder 
setTaskManagerLocation(TaskManagerLocation taskManagerLocation) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index e8ecc56..581d8ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
@@ -45,6 +46,8 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
@@ -158,6 +161,11 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
 
                        network.start();
 
+                       TaskExecutorLocalStateStoresManager storesManager = new 
TaskExecutorLocalStateStoresManager(
+                               LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                               ioManager.getSpillingDirectories(),
+                               Executors.directExecutor());
+
                        MetricRegistryConfiguration metricRegistryConfiguration 
= MetricRegistryConfiguration.fromConfiguration(config);
 
                        // create the task manager
@@ -169,6 +177,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                                memManager,
                                ioManager,
                                network,
+                               storesManager,
                                numberOfSlots,
                                highAvailabilityServices,
                                new 
TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, 
connectionInfo.getHostname(), 
connectionInfo.getResourceID().getResourceIdString()));

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 107826f..fce9620 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -18,15 +18,13 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import static org.junit.Assert.*;
-
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
@@ -36,6 +34,8 @@ import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
+import akka.actor.ActorSystem;
+import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -43,16 +43,19 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import scala.Option;
-
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.BindException;
+import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
 
+import scala.Option;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Tests that check how the TaskManager behaves when encountering startup
  * problems.
@@ -246,11 +249,11 @@ public class TaskManagerStartupTest extends TestLogger {
                        
cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
                        cfg.setInteger(TaskManagerOptions.DATA_PORT, 
blocker.getLocalPort());
                        cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L);
-
+                       ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(cfg);
                        TaskManager.startTaskManagerComponentsAndActor(
                                cfg,
                                ResourceID.generate(),
-                               null,
+                               actorSystem,
                                highAvailabilityServices,
                                NoOpMetricRegistry.INSTANCE,
                                "localhost",

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index bc9b04f..ce41b10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -39,9 +39,9 @@ public class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
        protected volatile OneShotLatch blocker;
        protected volatile OneShotLatch waiter;
 
-       MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
lastCreatedStream;
+       BlockingCheckpointOutputStream lastCreatedStream;
 
-       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
getLastCreatedStream() {
+       public BlockingCheckpointOutputStream getLastCreatedStream() {
                return lastCreatedStream;
        }
 
@@ -70,80 +70,13 @@ public class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
        }
 
        @Override
-       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
Exception {
-               this.lastCreatedStream = new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
-                       private int afterNInvocations = afterNumberInvocations;
-                       private final OneShotLatch streamBlocker = blocker;
-                       private final OneShotLatch streamWaiter = waiter;
-
-                       @Override
-                       public void write(int b) throws IOException {
-
-                               unblockWaiter();
-
-                               if (afterNInvocations > 0) {
-                                       --afterNInvocations;
-                               } else {
-                                       awaitBlocker();
-                               }
-
-                               try {
-                                       super.write(b);
-                               } catch (IOException ex) {
-                                       unblockWaiter();
-                                       throw ex;
-                               }
-
-                               if (0 == afterNInvocations) {
-                                       unblockWaiter();
-                               }
-
-                               // We also check for close here, in case the 
underlying stream does not do this
-                               if (isClosed()) {
-                                       throw new IOException("Stream closed.");
-                               }
-                       }
-
-                       //We override this to ensure that writes go through the 
blocking #write(int) method!
-                       @Override
-                       public void write(byte[] b, int off, int len) throws 
IOException {
-                               for (int i = 0; i < len; i++) {
-                                       write(b[off + i]);
-                               }
-                       }
-
-                       @Override
-                       public void close() {
-                               super.close();
-                               // trigger all the latches, essentially all 
blocking ops on the stream should resume after close.
-                               unblockAll();
-                       }
-
-                       private void unblockWaiter() {
-                               if (null != streamWaiter) {
-                                       streamWaiter.trigger();
-                               }
-                       }
-
-                       private void awaitBlocker() {
-                               if (null != streamBlocker) {
-                                       try {
-                                               streamBlocker.await();
-                                       } catch (InterruptedException ignored) {
-                                       }
-                               }
-                       }
-
-                       private void unblockAll() {
-                               if (null != streamWaiter) {
-                                       streamWaiter.trigger();
-                               }
-                               if (null != streamBlocker) {
-                                       streamBlocker.trigger();
-                               }
-                       }
-               };
+       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
IOException {
+
+               this.lastCreatedStream = new BlockingCheckpointOutputStream(
+                       new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
+                       waiter,
+                       blocker,
+                       afterNumberInvocations);
 
                return lastCreatedStream;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
new file mode 100644
index 0000000..fd8e59d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingCheckpointOutputStream.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ /**
+ * This test utility class provides a CheckpointStateOutputStream (which is 
also a FSDataOutputStream) that can block
+ * on a latch and takes a latch that the creator can block on until the stream 
is closed. This is typically used to
+ * test that a blocking read can be interrupted / closed.
+ */
+public class BlockingCheckpointOutputStream extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
+
+       /** Optional delegate stream to which all ops are forwarded. */
+       private final FSDataOutputStream delegate;
+
+       /** Optional latch on which the stream blocks, e.g. until the test 
triggers it after some call to #close(). */
+       private final OneShotLatch triggerUnblock;
+
+       /** Optional latch on which the test can block until the stream is 
blocked at the desired blocking position. */
+       private final OneShotLatch waitForBlocking;
+
+       /** Closed flag. */
+       private final AtomicBoolean closed;
+
+       /** The read position at which this will block. 0 by default. */
+       private final long blockAtPosition;
+
+       /** The current read position. */
+       private long position;
+
+       public BlockingCheckpointOutputStream(
+               @Nullable OneShotLatch waitForBlocking,
+               @Nullable OneShotLatch triggerUnblock) {
+               this(null, waitForBlocking, triggerUnblock, 0L);
+       }
+
+       public BlockingCheckpointOutputStream(
+               @Nullable FSDataOutputStream delegate,
+               @Nullable OneShotLatch waitForBlock,
+               @Nullable OneShotLatch triggerUnblock) {
+               this(delegate, waitForBlock, triggerUnblock, 0L);
+       }
+
+       public BlockingCheckpointOutputStream(
+               @Nullable FSDataOutputStream delegate,
+               @Nullable OneShotLatch waitForBlocking,
+               @Nullable OneShotLatch triggerUnblock,
+               long blockAtPosition) {
+
+               this.delegate = delegate;
+               this.triggerUnblock = triggerUnblock;
+               this.waitForBlocking = waitForBlocking;
+               this.blockAtPosition = blockAtPosition;
+               if (delegate != null) {
+                       try {
+                               this.position = delegate.getPos();
+                       } catch (IOException e) {
+                               throw new RuntimeException(e);
+                       }
+               } else {
+                       this.position = 0;
+               }
+               this.closed = new AtomicBoolean(false);
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+
+               if (position == blockAtPosition) {
+                       unblockWaiter();
+                       awaitUnblocker();
+               }
+
+               if (delegate != null) {
+                       try {
+                               delegate.write(b);
+                       } catch (IOException ex) {
+                               unblockWaiter();
+                               throw ex;
+                       }
+               }
+
+               // We also check for close here, in case the underlying stream 
does not do this
+               if (closed.get()) {
+                       throw new IOException("Stream closed.");
+               }
+
+               ++position;
+       }
+
+       //We override this to ensure that writes go through the blocking 
#write(int) method!
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               for (int i = 0; i < len; i++) {
+                       write(b[off + i]);
+               }
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return position;
+       }
+
+       @Override
+       public void flush() throws IOException {
+               if (delegate != null) {
+                       delegate.flush();
+               }
+       }
+
+       @Override
+       public void sync() throws IOException {
+               if (delegate != null) {
+                       delegate.sync();
+               }
+       }
+
+       @Override
+       public void close() {
+               if (closed.compareAndSet(false, true)) {
+                       if (delegate != null) {
+                               IOUtils.closeQuietly(delegate);
+                       }
+                       // trigger all the latches, essentially all blocking 
ops on the stream should resume after close.
+                       unblockAll();
+               }
+       }
+
+       private void unblockWaiter() {
+               if (null != waitForBlocking) {
+                       waitForBlocking.trigger();
+               }
+       }
+
+       private void awaitUnblocker() {
+               if (null != triggerUnblock) {
+                       try {
+                               triggerUnblock.await();
+                       } catch (InterruptedException ignored) {
+                       }
+               }
+       }
+
+       private void unblockAll() {
+               if (null != waitForBlocking) {
+                       waitForBlocking.trigger();
+               }
+               if (null != triggerUnblock) {
+                       triggerUnblock.trigger();
+               }
+       }
+
+       @Nullable
+       @Override
+       public StreamStateHandle closeAndGetHandle() throws IOException {
+
+               if (!closed.compareAndSet(false, true)) {
+                       throw new IOException("Stream was already closed!");
+               }
+
+               if (delegate instanceof 
CheckpointStreamFactory.CheckpointStateOutputStream) {
+                       StreamStateHandle streamStateHandle =
+                               
((CheckpointStreamFactory.CheckpointStateOutputStream) 
delegate).closeAndGetHandle();
+                       unblockAll();
+                       return streamStateHandle;
+               } else {
+                       unblockAll();
+                       throw new IOException("Delegate is not a 
CheckpointStateOutputStream!");
+               }
+       }
+
+       public boolean isClosed() {
+               return closed.get();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java
new file mode 100644
index 0000000..4721980
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingFSDataInputStream.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This test utility class provides a {@link FSDataInputStream} that can block 
on a latch and takes a latch that
+ * the creator can block on until the stream is closed. This is typically used 
to test that a blocking read can be
+ * interrupted / closed.
+ */
+public class BlockingFSDataInputStream extends FSDataInputStream {
+
+       /** Optional delegate stream to which all ops are forwarded. */
+       private final FSDataInputStream delegate;
+
+       /** Optional latch on which the stream blocks, e.g. until the test 
triggers it after some call to #close(). */
+       private final OneShotLatch triggerUnblock;
+
+       /** Optional latch on which the test can block until the stream is 
blocked at the desired blocking position. */
+       private final OneShotLatch waitUntilStreamBlocked;
+
+       /** Closed flag. */
+       private final AtomicBoolean closed;
+
+       /** The read position at which this will block. 0 by default. */
+       private final long blockAtPosition;
+
+       /** The current read position. */
+       private long position;
+
+       public BlockingFSDataInputStream(
+               @Nullable OneShotLatch waitForBlock,
+               @Nullable OneShotLatch triggerUnblock) {
+               this(null, waitForBlock, triggerUnblock, 0L);
+       }
+
+       public BlockingFSDataInputStream(
+               @Nullable FSDataInputStream delegate,
+               @Nullable OneShotLatch waitForBlock,
+               @Nullable OneShotLatch triggerUnblock) {
+               this(delegate, waitForBlock, triggerUnblock, 0L);
+       }
+
+       public BlockingFSDataInputStream(
+               @Nullable FSDataInputStream delegate,
+               @Nullable OneShotLatch waitForBlock,
+               @Nullable OneShotLatch triggerUnblock,
+               long blockAtPosition) {
+
+               this.delegate = delegate;
+               this.triggerUnblock = triggerUnblock;
+               this.waitUntilStreamBlocked = waitForBlock;
+               this.blockAtPosition = blockAtPosition;
+               if (delegate != null) {
+                       try {
+                               this.position = delegate.getPos();
+                       } catch (IOException e) {
+                               throw new RuntimeException(e);
+                       }
+               } else {
+                       this.position = 0;
+               }
+               this.closed = new AtomicBoolean(false);
+       }
+
+       @Override
+       public void seek(long desired) throws IOException {
+               if (delegate != null) {
+                       delegate.seek(desired);
+               }
+               this.position = desired;
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return position;
+       }
+
+       @Override
+       public int read() throws IOException {
+
+               if (position == blockAtPosition) {
+                       unblockWaiter();
+                       awaitBlocker();
+               }
+
+               int val = 0;
+               if (delegate != null) {
+                       try {
+                               val = delegate.read();
+                       } catch (IOException ex) {
+                               unblockWaiter();
+                               throw ex;
+                       }
+               }
+
+               // We also check for close here, in case the underlying stream 
does not do this
+               if (closed.get()) {
+                       throw new IOException("Stream closed.");
+               } else {
+                       ++position;
+                       return val;
+               }
+       }
+
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               // We override this to ensure that we use the blocking read 
method internally.
+               if (b == null) {
+                       throw new NullPointerException();
+               } else if (off < 0 || len < 0 || len > b.length - off) {
+                       throw new IndexOutOfBoundsException();
+               } else if (len == 0) {
+                       return 0;
+               }
+
+               int c = read();
+               if (c == -1) {
+                       return -1;
+               }
+               b[off] = (byte) c;
+
+               int i = 1;
+               try {
+                       for (; i < len; i++) {
+                               c = read();
+                               if (c == -1) {
+                                       break;
+                               }
+                               b[off + i] = (byte) c;
+                       }
+               } catch (IOException ee) {
+               }
+               return i;
+       }
+
+       @Override
+       public void close() {
+               if (closed.compareAndSet(false, true)) {
+                       if (delegate != null) {
+                               IOUtils.closeQuietly(delegate);
+                       }
+                       // trigger all the latches, essentially all blocking 
ops on the stream should resume after close.
+                       unblockAll();
+               }
+       }
+
+       private void unblockWaiter() {
+               if (null != waitUntilStreamBlocked) {
+                       waitUntilStreamBlocked.trigger();
+               }
+       }
+
+       private void awaitBlocker() {
+               if (null != triggerUnblock) {
+                       try {
+                               triggerUnblock.await();
+                       } catch (InterruptedException ignored) {
+                       }
+               }
+       }
+
+       private void unblockAll() {
+               if (null != waitUntilStreamBlocked) {
+                       waitUntilStreamBlocked.trigger();
+               }
+               if (null != triggerUnblock) {
+                       triggerUnblock.trigger();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index fe293d7..cc0d4fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -56,8 +56,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -68,8 +70,9 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.testutils.TestJvmProcess;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.SerializedValue;
-
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
@@ -86,6 +89,9 @@ import static org.mockito.Mockito.when;
  */
 public class JvmExitOnFatalErrorTest {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        @Test
        public void testExitJvmOnOutOfMemory() throws Exception {
                // this test works only on linux
@@ -138,12 +144,13 @@ public class JvmExitOnFatalErrorTest {
                        System.err.println("creating task");
 
                        // we suppress process exits via errors here to not
-                       // have a test that exits accidentally due to a 
programming error 
+                       // have a test that exits accidentally due to a 
programming error
                        try {
                                final Configuration taskManagerConfig = new 
Configuration();
                                
taskManagerConfig.setBoolean(TaskManagerOptions.KILL_ON_OUT_OF_MEMORY, true);
 
                                final JobID jid = new JobID();
+                               final AllocationID allocationID = new 
AllocationID();
                                final JobVertexID jobVertexId = new 
JobVertexID();
                                final ExecutionAttemptID executionAttemptID = 
new ExecutionAttemptID();
                                final AllocationID slotAllocationId = new 
AllocationID();
@@ -172,7 +179,15 @@ public class JvmExitOnFatalErrorTest {
                                BlobCacheService blobService =
                                        new 
BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
 
-                               final TaskLocalStateStore localStateStore = new 
TaskLocalStateStore(jid, jobVertexId, 0);
+                               final TaskLocalStateStore localStateStore =
+                                       new TaskLocalStateStoreImpl(
+                                               jid,
+                                               allocationID,
+                                               jobVertexId,
+                                               0,
+                                               
TestLocalRecoveryConfig.disabled(),
+                                               executor);
+
                                final TaskStateManager slotStateManager =
                                        new TaskStateManagerImpl(
                                                jid,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java
deleted file mode 100644
index 7d8797b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestByteStreamStateHandleDeepCompare.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class TestByteStreamStateHandleDeepCompare extends 
ByteStreamStateHandle {
-
-       private static final long serialVersionUID = -4946526195523509L;
-
-       public TestByteStreamStateHandleDeepCompare(String handleName, byte[] 
data) {
-               super(handleName, data);
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (!super.equals(o)) {
-                       return false;
-               }
-               ByteStreamStateHandle other = (ByteStreamStateHandle) o;
-               return Arrays.equals(getData(), other.getData());
-       }
-
-       @Override
-       public int hashCode() {
-               return 31 * super.hashCode() + Arrays.hashCode(getData());
-       }
-
-       public static StreamStateHandle fromSerializable(String handleName, 
Serializable value) throws IOException {
-               return new TestByteStreamStateHandleDeepCompare(handleName, 
InstantiationUtil.serializeObject(value));
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index da753ae..5e60be9 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
@@ -38,6 +39,7 @@ class TestingTaskManager(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
+    taskManagerStateStore: TaskExecutorLocalStateStoresManager,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup : TaskManagerMetricGroup)
@@ -48,6 +50,7 @@ class TestingTaskManager(
     memoryManager,
     ioManager,
     network,
+    taskManagerStateStore,
     numberOfSlots,
     highAvailabilityServices,
     taskManagerMetricGroup)
@@ -59,6 +62,7 @@ class TestingTaskManager(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
+    taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup : TaskManagerMetricGroup) {
@@ -69,6 +73,7 @@ class TestingTaskManager(
       memoryManager,
       ioManager,
       network,
+      taskManagerLocalStateStoresManager,
       numberOfSlots,
       highAvailabilityServices,
       taskManagerMetricGroup)

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4fc0283..344255f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -60,9 +60,11 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
@@ -212,6 +214,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** Unique ID of this backend. */
        private UUID backendUID;
 
+       /** The configuration of local recovery. */
+       private final LocalRecoveryConfig localRecoveryConfig;
+
        public RocksDBKeyedStateBackend(
                String operatorIdentifier,
                ClassLoader userCodeClassLoader,
@@ -223,7 +228,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
                ExecutionConfig executionConfig,
-               boolean enableIncrementalCheckpointing
+               boolean enableIncrementalCheckpointing,
+               LocalRecoveryConfig localRecoveryConfig
        ) throws IOException {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
@@ -253,6 +259,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        String.format("Could not create RocksDB 
data directory at %s.", instanceBasePath.getAbsolutePath()));
                }
 
+               this.localRecoveryConfig = 
Preconditions.checkNotNull(localRecoveryConfig);
                this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
                this.kvStateInformation = new HashMap<>();
                this.restoredKvStateMetaInfos = new HashMap<>();
@@ -365,10 +372,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * @param streamFactory The factory that we can use for writing our 
state to streams.
         * @param checkpointOptions Options for how to perform this checkpoint.
         * @return Future to the state handle of the snapshot data.
-        * @throws Exception
+        * @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
         */
        @Override
-       public RunnableFuture<KeyedStateHandle> snapshot(
+       public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
                final long checkpointId,
                final long timestamp,
                final CheckpointStreamFactory streamFactory,
@@ -382,7 +389,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
-       private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
+       private RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotIncrementally(
                final long checkpointId,
                final long checkpointTimestamp,
                final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
@@ -396,7 +403,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
                                                checkpointTimestamp);
                        }
-                       return DoneFuture.nullValue();
+                       return DoneFuture.of(SnapshotResult.empty());
                }
 
                final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
@@ -414,11 +421,12 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        throw e;
                }
 
-               return new FutureTask<KeyedStateHandle>(
-                       new Callable<KeyedStateHandle>() {
+               return new FutureTask<SnapshotResult<KeyedStateHandle>>(
+                       new Callable<SnapshotResult<KeyedStateHandle>>() {
                                @Override
-                               public KeyedStateHandle call() throws Exception 
{
-                                       return 
snapshotOperation.materializeSnapshot();
+                               public SnapshotResult<KeyedStateHandle> call() 
throws Exception {
+                                       KeyedStateHandle keyedStateHandle = 
snapshotOperation.materializeSnapshot();
+                                       return 
SnapshotResult.of(keyedStateHandle);
                                }
                        }
                ) {
@@ -435,7 +443,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                };
        }
 
-       private RunnableFuture<KeyedStateHandle> snapshotFully(
+       private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully(
                final long checkpointId,
                final long timestamp,
                final CheckpointStreamFactory streamFactory) throws Exception {
@@ -450,15 +458,15 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.", timestamp);
                        }
 
-                       return DoneFuture.nullValue();
+                       return DoneFuture.of(SnapshotResult.empty());
                }
 
                snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
                snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
 
                // implementation of the async IO operation, based on FutureTask
-               AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable 
=
-                       new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
+               
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
+                       new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
 
                                @Override
                                protected void acquireResources() throws 
Exception {
@@ -493,7 +501,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
 
                                @Override
-                               public KeyGroupsStateHandle performOperation() 
throws Exception {
+                               public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
                                        long startTime = 
System.currentTimeMillis();
 
                                        if (isStopped()) {
@@ -505,7 +513,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
                                                streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
 
-                                       return 
snapshotOperation.getSnapshotResultStateHandle();
+                                       KeyGroupsStateHandle 
snapshotResultStateHandle = snapshotOperation.getSnapshotResultStateHandle();
+                                       return 
SnapshotResult.of(snapshotResultStateHandle);
                                }
                        };
 

Reply via email to