http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
index 4277348..44ca9d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.RunnableFuture;
@@ -31,8 +33,11 @@ import java.util.concurrent.RunnableFuture;
  * This class is a default implementation for StateSnapshotContext.
  */
 public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext, Closeable {
-       
+
+       /** Checkpoint id of the snapshot. */
        private final long checkpointId;
+
+       /** Checkpoint timestamp of the snapshot. */
        private final long checkpointTimestamp;
        
        /** Factory for he checkpointing stream */
@@ -47,7 +52,10 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
         */
        private final CloseableRegistry closableRegistry;
 
+       /** Output stream for the raw keyed state. */
        private KeyedStateCheckpointOutputStream 
keyedStateCheckpointOutputStream;
+
+       /** Output stream for the raw operator state. */
        private OperatorStateCheckpointOutputStream 
operatorStateCheckpointOutputStream;
 
        @VisibleForTesting
@@ -109,14 +117,23 @@ public class StateSnapshotContextSynchronousImpl 
implements StateSnapshotContext
                return operatorStateCheckpointOutputStream;
        }
 
-       public RunnableFuture<KeyedStateHandle> getKeyedStateStreamFuture() 
throws IOException {
-               KeyGroupsStateHandle keyGroupsStateHandle = 
closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
-               return new DoneFuture<KeyedStateHandle>(keyGroupsStateHandle);
+       @Nonnull
+       public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
getKeyedStateStreamFuture() throws IOException {
+               KeyedStateHandle keyGroupsStateHandle =
+                       
closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
+               return toDoneFutureOfSnapshotResult(keyGroupsStateHandle);
        }
 
-       public RunnableFuture<OperatorStateHandle> 
getOperatorStateStreamFuture() throws IOException {
-               OperatorStateHandle operatorStateHandle = 
closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
-               return new DoneFuture<>(operatorStateHandle);
+       @Nonnull
+       public RunnableFuture<SnapshotResult<OperatorStateHandle>> 
getOperatorStateStreamFuture() throws IOException {
+               OperatorStateHandle operatorStateHandle =
+                       
closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
+               return toDoneFutureOfSnapshotResult(operatorStateHandle);
+       }
+
+       private <T extends StateObject> RunnableFuture<SnapshotResult<T>> 
toDoneFutureOfSnapshotResult(T handle) {
+               SnapshotResult<T> snapshotResult = SnapshotResult.of(handle);
+               return DoneFuture.of(snapshotResult);
        }
 
        private <T extends StreamStateHandle> T 
closeAndUnregisterStreamToObtainStateHandle(
@@ -130,7 +147,7 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
        }
 
        private <T extends StreamStateHandle> void closeAndUnregisterStream(
-               NonClosingCheckpointOutputStream<T> stream) throws IOException {
+               NonClosingCheckpointOutputStream<? extends T> stream) throws 
IOException {
 
                Preconditions.checkNotNull(stream);
 
@@ -149,9 +166,7 @@ public class StateSnapshotContextSynchronousImpl implements 
StateSnapshotContext
                        try {
                                
closeAndUnregisterStream(keyedStateCheckpointOutputStream);
                        } catch (IOException e) {
-                               exception = ExceptionUtils.firstOrSuppressed(
-                                       new IOException("Could not close the 
raw keyed state checkpoint output stream.", e),
-                                       exception);
+                               exception = new IOException("Could not close 
the raw keyed state checkpoint output stream.", e);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 326b95c..a940aef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -18,81 +18,241 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
- * This class holds the all {@link TaskLocalStateStore} objects for a task 
executor (manager).
- *
- * TODO: this still still work in progress and partially still acts as a 
placeholder.
+ * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task 
executor (manager).
  */
 public class TaskExecutorLocalStateStoresManager {
 
+       /** Logger for this class. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
+
        /**
         * This map holds all local state stores for tasks running on the task 
manager / executor that own the instance of
-        * this.
+        * this. Maps from allocation id to all the subtask's local state 
stores.
         */
-       private final Map<JobID, Map<JobVertexSubtaskKey, TaskLocalStateStore>> 
taskStateManagers;
+       @GuardedBy("lock")
+       private final Map<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> taskStateStoresByAllocationID;
+
+       /** The configured mode for local recovery on this task manager. */
+       private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+
+       /** This is the root directory for all local state of this task manager 
/ executor. */
+       private final File[] localStateRootDirectories;
+
+       /** Executor that runs the discarding of released state objects. */
+       private final Executor discardExecutor;
+
+       /** Guarding lock for taskStateStoresByAllocationID and closed-flag. */
+       private final Object lock;
+
+       private final Thread shutdownHook;
+
+       @GuardedBy("lock")
+       private boolean closed;
+
+       public TaskExecutorLocalStateStoresManager(
+               @Nonnull LocalRecoveryConfig.LocalRecoveryMode 
localRecoveryMode,
+               @Nonnull File[] localStateRootDirectories,
+               @Nonnull Executor discardExecutor) throws IOException {
+
+               this.taskStateStoresByAllocationID = new HashMap<>();
+               this.localRecoveryMode = localRecoveryMode;
+               this.localStateRootDirectories = localStateRootDirectories;
+               this.discardExecutor = discardExecutor;
+               this.lock = new Object();
+               this.closed = false;
+
+               for (File localStateRecoveryRootDir : 
localStateRootDirectories) {
+                       if (!localStateRecoveryRootDir.exists()) {
+
+                               if (!localStateRecoveryRootDir.mkdirs()) {
+                                       throw new IOException("Could not create 
root directory for local recovery: " +
+                                               localStateRecoveryRootDir);
+                               }
+                       }
+               }
 
-       public TaskExecutorLocalStateStoresManager() {
-               this.taskStateManagers = new HashMap<>();
+               // register a shutdown hook
+               this.shutdownHook = 
ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), 
LOG);
        }
 
-       public TaskLocalStateStore localStateStoreForTask(
-               JobID jobId,
-               JobVertexID jobVertexID,
-               int subtaskIndex) {
+       @Nonnull
+       public TaskLocalStateStore localStateStoreForSubtask(
+               @Nonnull JobID jobId,
+               @Nonnull AllocationID allocationID,
+               @Nonnull JobVertexID jobVertexID,
+               @Nonnegative int subtaskIndex) {
+
+               synchronized (lock) {
+
+                       if (closed) {
+                               throw new 
IllegalStateException("TaskExecutorLocalStateStoresManager is already closed 
and cannot " +
+                                       "register a new TaskLocalStateStore.");
+                       }
+
+                       final Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
taskStateManagers =
+                               
this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new 
HashMap<>());
+
+                       final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
+
+                       // create the allocation base dirs, one inside each 
root dir.
+                       File[] allocationBaseDirectories = 
allocationBaseDirectories(allocationID);
 
-               Preconditions.checkNotNull(jobId);
-               final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
+                       LocalRecoveryDirectoryProviderImpl directoryProvider = 
new LocalRecoveryDirectoryProviderImpl(
+                               allocationBaseDirectories,
+                               jobId,
+                               jobVertexID,
+                               subtaskIndex);
 
-               final Map<JobVertexSubtaskKey, TaskLocalStateStore> 
taskStateManagers =
-                       this.taskStateManagers.computeIfAbsent(jobId, k -> new 
HashMap<>());
+                       LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(
+                               localRecoveryMode,
+                               directoryProvider);
 
-               return taskStateManagers.computeIfAbsent(
-                       taskKey, k -> new TaskLocalStateStore(jobId, 
jobVertexID, subtaskIndex));
+                       return taskStateManagers.computeIfAbsent(
+                               taskKey,
+                               k -> new TaskLocalStateStoreImpl(
+                                       jobId,
+                                       allocationID,
+                                       jobVertexID,
+                                       subtaskIndex,
+                                       localRecoveryConfig,
+                                       discardExecutor));
+               }
        }
 
-       public void releaseJob(JobID jobID) {
+       public void releaseLocalStateForAllocationId(@Nonnull AllocationID 
allocationID) {
+
+               Map<JobVertexSubtaskKey, TaskLocalStateStoreImpl> 
cleanupLocalStores;
 
-               Map<JobVertexSubtaskKey, TaskLocalStateStore> 
cleanupLocalStores = taskStateManagers.remove(jobID);
+               synchronized (lock) {
+                       if (closed) {
+                               return;
+                       }
+                       cleanupLocalStores = 
taskStateStoresByAllocationID.remove(allocationID);
+               }
 
                if (cleanupLocalStores != null) {
                        doRelease(cleanupLocalStores.values());
                }
+
+               cleanupAllocationBaseDirs(allocationID);
        }
 
-       public void releaseAll() {
+       public void shutdown() {
+
+               HashMap<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> toRelease;
+
+               synchronized (lock) {
 
-               for (Map<JobVertexSubtaskKey, TaskLocalStateStore> 
stateStoreMap : taskStateManagers.values()) {
-                       doRelease(stateStoreMap.values());
+                       if (closed) {
+                               return;
+                       }
+
+                       closed = true;
+                       toRelease = new 
HashMap<>(taskStateStoresByAllocationID);
+                       taskStateStoresByAllocationID.clear();
+               }
+
+               ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
+
+               LOG.debug("Shutting down TaskExecutorLocalStateStoresManager.");
+
+               for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, 
TaskLocalStateStoreImpl>> entry :
+                       toRelease.entrySet()) {
+
+                       doRelease(entry.getValue().values());
+                       cleanupAllocationBaseDirs(entry.getKey());
                }
+       }
+
+       @VisibleForTesting
+       public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
+               return localRecoveryMode;
+       }
+
+       @VisibleForTesting
+       File[] getLocalStateRootDirectories() {
+               return localStateRootDirectories;
+       }
+
+       @VisibleForTesting
+       String allocationSubDirString(AllocationID allocationID) {
+               return "aid_" + allocationID;
+       }
 
-               taskStateManagers.clear();
+       private File[] allocationBaseDirectories(AllocationID allocationID) {
+               final String allocationSubDirString = 
allocationSubDirString(allocationID);
+               final File[] allocationDirectories = new 
File[localStateRootDirectories.length];
+               for (int i = 0; i < localStateRootDirectories.length; ++i) {
+                       allocationDirectories[i] = new 
File(localStateRootDirectories[i], allocationSubDirString);
+               }
+               return allocationDirectories;
        }
 
-       private void doRelease(Iterable<TaskLocalStateStore> toRelease) {
+       private void doRelease(Iterable<TaskLocalStateStoreImpl> toRelease) {
+
                if (toRelease != null) {
-                       for (TaskLocalStateStore stateStore : toRelease) {
-                               stateStore.dispose();
+
+                       for (TaskLocalStateStoreImpl stateStore : toRelease) {
+                               try {
+                                       stateStore.dispose();
+                               } catch (Exception disposeEx) {
+                                       LOG.warn("Exception while disposing 
local state store " + stateStore, disposeEx);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Deletes the base dirs for this allocation id (recursively).
+        */
+       private void cleanupAllocationBaseDirs(AllocationID allocationID) {
+               // clear the base dirs for this allocation id.
+               File[] allocationDirectories = 
allocationBaseDirectories(allocationID);
+               for (File directory : allocationDirectories) {
+                       try {
+                               FileUtils.deleteFileOrDirectory(directory);
+                       } catch (IOException e) {
+                               LOG.warn("Exception while deleting local state 
directory for allocation " + allocationID, e);
                        }
                }
        }
 
+       /**
+        * Composite key of {@link JobVertexID} and subtask index that 
describes the subtask of a job vertex.
+        */
        private static final class JobVertexSubtaskKey {
 
+               /** The job vertex id. */
                @Nonnull
                final JobVertexID jobVertexID;
+
+               /** The subtask index. */
+               @Nonnegative
                final int subtaskIndex;
 
-               public JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, 
int subtaskIndex) {
-                       this.jobVertexID = 
Preconditions.checkNotNull(jobVertexID);
+               JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, 
@Nonnegative int subtaskIndex) {
+                       this.jobVertexID = jobVertexID;
                        this.subtaskIndex = subtaskIndex;
                }
 
@@ -107,10 +267,7 @@ public class TaskExecutorLocalStateStoresManager {
 
                        JobVertexSubtaskKey that = (JobVertexSubtaskKey) o;
 
-                       if (subtaskIndex != that.subtaskIndex) {
-                               return false;
-                       }
-                       return jobVertexID.equals(that.jobVertexID);
+                       return subtaskIndex == that.subtaskIndex && 
jobVertexID.equals(that.jobVertexID);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
index f743630..7089894 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
@@ -18,44 +18,48 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 /**
- * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
- * access to a state that is stored locally for a faster recovery compared to 
the state that is stored remotely in a
- * stable store DFS. For now, this storage is only complementary to the stable 
storage and local state is typically
- * lost in case of machine failures. In such cases (and others), client code 
of this class must fall back to using the
- * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be implemented!
+ * Classes  that implement this interface serve as a task-manager-level local 
storage for local checkpointed state.
+ * The purpose is to provide  access to a state that is stored locally for a 
faster recovery compared to the state that
+ * is stored remotely in a stable store DFS. For now, this storage is only 
complementary to the stable storage and local
+ * state is typically lost in case of machine failures. In such cases (and 
others), client code of this class must fall
+ * back to using the slower but highly available store.
  */
-public class TaskLocalStateStore {
-
-       /** */
-       private final JobID jobID;
-
-       /** */
-       private final JobVertexID jobVertexID;
-
-       /** */
-       private final int subtaskIndex;
-
-       public TaskLocalStateStore(
-               JobID jobID,
-               JobVertexID jobVertexID,
-               int subtaskIndex) {
+public interface TaskLocalStateStore {
+       /**
+        * Stores the local state for the given checkpoint id.
+        *
+        * @param checkpointId id for the checkpoint that created the local 
state that will be stored.
+        * @param localState the local state to store.
+        */
+       void storeLocalState(
+               @Nonnegative long checkpointId,
+               @Nullable TaskStateSnapshot localState);
 
-               this.jobID = jobID;
-               this.jobVertexID = jobVertexID;
-               this.subtaskIndex = subtaskIndex;
-       }
+       /**
+        * Returns the local state that is stored under the given checkpoint id 
or null if nothing was stored under the id.
+        *
+        * @param checkpointID the checkpoint id by which we search for local 
state.
+        * @return the local state found for the given checkpoint id. Can be 
null
+        */
+       @Nullable
+       TaskStateSnapshot retrieveLocalState(long checkpointID);
 
-       public void storeSnapshot(/* TODO */) {
-               throw new UnsupportedOperationException("TODO!");
-       }
+       /**
+        * Returns the {@link LocalRecoveryConfig} for this task local state 
store.
+        */
+       @Nonnull
+       LocalRecoveryConfig getLocalRecoveryConfig();
 
-       public void dispose() {
-               throw new UnsupportedOperationException("TODO!");
-       }
+       /**
+        * Notifies that the checkpoint with the given id was confirmed as 
complete. This prunes the checkpoint history
+        * and removes all local states with a checkpoint id that is smaller 
than the newly confirmed checkpoint id.
+        */
+       void confirmCheckpoint(long confirmedCheckpointId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
new file mode 100644
index 0000000..191c109
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/**
+ * Main implementation of a {@link TaskLocalStateStore}.
+ */
+public class TaskLocalStateStoreImpl implements TaskLocalStateStore {
+
+       /** Logger for this class. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStoreImpl.class);
+
+       /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+       private static final TaskStateSnapshot NULL_DUMMY = new 
TaskStateSnapshot(0);
+
+       /** JobID from the owning subtask. */
+       @Nonnull
+       private final JobID jobID;
+
+       /** AllocationID of the owning slot. */
+       @Nonnull
+       private final AllocationID allocationID;
+
+       /** JobVertexID of the owning subtask. */
+       @Nonnull
+       private final JobVertexID jobVertexID;
+
+       /** Subtask index of the owning subtask. */
+       @Nonnegative
+       private final int subtaskIndex;
+
+       /** The configured mode for local recovery. */
+       @Nonnull
+       private final LocalRecoveryConfig localRecoveryConfig;
+
+       /** Executor that runs the discarding of released state objects. */
+       @Nonnull
+       private final Executor discardExecutor;
+
+       /** Lock for synchronisation on the storage map and the discarded 
status. */
+       @Nonnull
+       private final Object lock;
+
+       /** Status flag if this store was already discarded. */
+       @GuardedBy("lock")
+       private boolean disposed;
+
+       /** Maps checkpoint ids to local TaskStateSnapshots. */
+       @Nonnull
+       @GuardedBy("lock")
+       private final SortedMap<Long, TaskStateSnapshot> 
storedTaskStateByCheckpointID;
+
+       public TaskLocalStateStoreImpl(
+               @Nonnull JobID jobID,
+               @Nonnull AllocationID allocationID,
+               @Nonnull JobVertexID jobVertexID,
+               @Nonnegative int subtaskIndex,
+               @Nonnull LocalRecoveryConfig localRecoveryConfig,
+               @Nonnull Executor discardExecutor) {
+
+               this.jobID = jobID;
+               this.allocationID = allocationID;
+               this.jobVertexID = jobVertexID;
+               this.subtaskIndex = subtaskIndex;
+               this.discardExecutor = discardExecutor;
+               this.lock = new Object();
+               this.storedTaskStateByCheckpointID = new TreeMap<>();
+               this.disposed = false;
+               this.localRecoveryConfig = localRecoveryConfig;
+       }
+
+       @Override
+       public void storeLocalState(
+               @Nonnegative long checkpointId,
+               @Nullable TaskStateSnapshot localState) {
+
+               if (localState == null) {
+                       localState = NULL_DUMMY;
+               }
+
+               LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
+               LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
+
+               Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
+
+               synchronized (lock) {
+                       if (disposed) {
+                               // we ignore late stores and simply discard the 
state.
+                               toDiscard.put(checkpointId, localState);
+                       } else {
+                               TaskStateSnapshot previous =
+                                       
storedTaskStateByCheckpointID.put(checkpointId, localState);
+
+                               if (previous != null) {
+                                       toDiscard.put(checkpointId, previous);
+                               }
+                       }
+               }
+
+               asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+       }
+
+       @Override
+       @Nullable
+       public TaskStateSnapshot retrieveLocalState(long checkpointID) {
+               synchronized (lock) {
+                       TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
+                       return snapshot != NULL_DUMMY ? snapshot : null;
+               }
+       }
+
+       @Override
+       @Nonnull
+       public LocalRecoveryConfig getLocalRecoveryConfig() {
+               return localRecoveryConfig;
+       }
+
+       @Override
+       public void confirmCheckpoint(long confirmedCheckpointId) {
+
+               LOG.debug("Received confirmation for checkpoint {}. Starting to 
prune history.", confirmedCheckpointId);
+
+               final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new 
ArrayList<>();
+
+               synchronized (lock) {
+
+                       Iterator<Map.Entry<Long, TaskStateSnapshot>> 
entryIterator =
+                               
storedTaskStateByCheckpointID.entrySet().iterator();
+
+                       // remove entries for outdated checkpoints and discard 
their state.
+                       while (entryIterator.hasNext()) {
+
+                               Map.Entry<Long, TaskStateSnapshot> 
snapshotEntry = entryIterator.next();
+                               long entryCheckpointId = snapshotEntry.getKey();
+
+                               if (entryCheckpointId < confirmedCheckpointId) {
+                                       toRemove.add(snapshotEntry);
+                                       entryIterator.remove();
+                               } else {
+                                       // we can stop because the map is 
sorted.
+                                       break;
+                               }
+                       }
+               }
+
+               asyncDiscardLocalStateForCollection(toRemove);
+       }
+
+       /**
+        * Disposes the state of all local snapshots managed by this object.
+        */
+       public CompletableFuture<Void> dispose() {
+
+               Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy;
+
+               synchronized (lock) {
+                       disposed = true;
+                       statesCopy = new 
ArrayList<>(storedTaskStateByCheckpointID.entrySet());
+                       storedTaskStateByCheckpointID.clear();
+               }
+
+               return CompletableFuture.runAsync(
+                       () -> {
+                               // discard all remaining state objects.
+                               syncDiscardLocalStateForCollection(statesCopy);
+
+                               // delete the local state subdirectory that 
belong to this subtask.
+                               LocalRecoveryDirectoryProvider 
directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
+                               for (int i = 0; i < 
directoryProvider.allocationBaseDirsCount(); ++i) {
+                                       File subtaskBaseDirectory = 
directoryProvider.selectSubtaskBaseDirectory(i);
+                                       try {
+                                               
deleteDirectory(subtaskBaseDirectory);
+                                       } catch (IOException e) {
+                                               LOG.warn("Exception when 
deleting local recovery subtask base dir: " + subtaskBaseDirectory, e);
+                                       }
+                               }
+                       },
+                       discardExecutor);
+       }
+
+       private void 
asyncDiscardLocalStateForCollection(Collection<Map.Entry<Long, 
TaskStateSnapshot>> toDiscard) {
+               if (!toDiscard.isEmpty()) {
+                       discardExecutor.execute(() -> 
syncDiscardLocalStateForCollection(toDiscard));
+               }
+       }
+
+       private void 
syncDiscardLocalStateForCollection(Collection<Map.Entry<Long, 
TaskStateSnapshot>> toDiscard) {
+               for (Map.Entry<Long, TaskStateSnapshot> entry : toDiscard) {
+                       discardLocalStateForCheckpoint(entry.getKey(), 
entry.getValue());
+               }
+       }
+
+       /**
+        * Helper method that discards state objects with an executor and 
reports exceptions to the log.
+        */
+       private void discardLocalStateForCheckpoint(long checkpointID, 
TaskStateSnapshot o) {
+
+               try {
+                       if (LOG.isTraceEnabled()) {
+                               LOG.trace("Discarding local task state snapshot 
of checkpoint {} for {}/{}/{}.",
+                                       checkpointID, jobID, jobVertexID, 
subtaskIndex);
+                       } else {
+                               LOG.debug("Discarding local task state snapshot 
{} of checkpoint {} for {}/{}/{}.",
+                                       o, checkpointID, jobID, jobVertexID, 
subtaskIndex);
+                       }
+                       o.discardState();
+               } catch (Exception discardEx) {
+                       LOG.warn("Exception while discarding local task state 
snapshot of checkpoint " + checkpointID + ".", discardEx);
+               }
+
+               LocalRecoveryDirectoryProvider directoryProvider = 
localRecoveryConfig.getLocalStateDirectoryProvider();
+               File checkpointDir = 
directoryProvider.subtaskSpecificCheckpointDirectory(checkpointID);
+               LOG.debug("Deleting local state directory {} of checkpoint {} 
for {}/{}/{}/{}.",
+                       checkpointDir, checkpointID, jobID, jobVertexID, 
subtaskIndex);
+               try {
+                       deleteDirectory(checkpointDir);
+               } catch (IOException ex) {
+                       LOG.warn("Exception while deleting local state 
directory of checkpoint " + checkpointID + ".", ex);
+               }
+       }
+
+       /**
+        * Helper method to delete a directory.
+        */
+       private void deleteDirectory(File directory) throws IOException {
+               Path path = new Path(directory.toURI());
+               FileSystem fileSystem = path.getFileSystem();
+               if (fileSystem.exists(path)) {
+                       fileSystem.delete(path, true);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "TaskLocalStateStore{" +
+                       "jobID=" + jobID +
+                       ", jobVertexID=" + jobVertexID +
+                       ", allocationID=" + allocationID +
+                       ", subtaskIndex=" + subtaskIndex +
+                       ", localRecoveryConfig=" + localRecoveryConfig +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index 8b41e9e..82591ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
@@ -44,19 +44,29 @@ public interface TaskStateManager extends 
CheckpointListener {
         * Report the state snapshots for the operator instances running in the 
owning task.
         *
         * @param checkpointMetaData meta data from the checkpoint request.
-        * @param checkpointMetrics task level metrics for the checkpoint.
-        * @param acknowledgedState the reported states from the owning task.
+        * @param checkpointMetrics  task level metrics for the checkpoint.
+        * @param acknowledgedState  the reported states to acknowledge to the 
job manager.
+        * @param localState         the reported states for local recovery.
         */
-       void reportStateHandles(
+       void reportTaskStateSnapshots(
                @Nonnull CheckpointMetaData checkpointMetaData,
                @Nonnull CheckpointMetrics checkpointMetrics,
-               @Nullable TaskStateSnapshot acknowledgedState);
+               @Nullable TaskStateSnapshot acknowledgedState,
+               @Nullable TaskStateSnapshot localState);
 
        /**
         * Returns means to restore previously reported state of an operator 
running in the owning task.
         *
         * @param operatorID the id of the operator for which we request state.
-        * @return previous state for the operator. Null if no previous state 
exists.
+        * @return Previous state for the operator. The previous state can be 
empty if the operator had no previous state.
         */
-       OperatorSubtaskState operatorStates(OperatorID operatorID);
+       @Nonnull
+       PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID 
operatorID);
+
+       /**
+        * Returns the configuration for local recovery, i.e. the base 
directories for all file-based local state of the
+        * owning subtask and the general mode for local recovery.
+        */
+       @Nonnull
+       LocalRecoveryConfig createLocalRecoveryConfig();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index 3cd66fb..3acca7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -31,6 +32,8 @@ import 
org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Collections;
+
 /**
  * This class is the default implementation of {@link TaskStateManager} and 
collaborates with the job manager
  * through {@link CheckpointResponder}) as well as a task-manager-local state 
store. Like this, client code does
@@ -75,35 +78,67 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
        }
 
        @Override
-       public void reportStateHandles(
+       public void reportTaskStateSnapshots(
                @Nonnull CheckpointMetaData checkpointMetaData,
                @Nonnull CheckpointMetrics checkpointMetrics,
-               @Nullable TaskStateSnapshot acknowledgedState) {
+               @Nullable TaskStateSnapshot acknowledgedState,
+               @Nullable TaskStateSnapshot localState) {
+
+               long checkpointId = checkpointMetaData.getCheckpointId();
+
+               localStateStore.storeLocalState(checkpointId, localState);
 
                checkpointResponder.acknowledgeCheckpoint(
                        jobId,
                        executionAttemptID,
-                       checkpointMetaData.getCheckpointId(),
+                       checkpointId,
                        checkpointMetrics,
                        acknowledgedState);
        }
 
+       @Nonnull
        @Override
-       public OperatorSubtaskState operatorStates(OperatorID operatorID) {
+       public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
 
                if (jobManagerTaskRestore == null) {
-                       return null;
+                       return 
PrioritizedOperatorSubtaskState.emptyNotRestored();
+               }
+
+               TaskStateSnapshot jobManagerStateSnapshot =
+                       jobManagerTaskRestore.getTaskStateSnapshot();
+
+               OperatorSubtaskState jobManagerSubtaskState =
+                       
jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+
+               if (jobManagerSubtaskState == null) {
+                       return 
PrioritizedOperatorSubtaskState.emptyNotRestored();
                }
 
-               TaskStateSnapshot taskStateSnapshot = 
jobManagerTaskRestore.getTaskStateSnapshot();
-               return 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+               TaskStateSnapshot localStateSnapshot =
+                       
localStateStore.retrieveLocalState(jobManagerTaskRestore.getRestoreCheckpointId());
 
-               /*
-               TODO!!!!!!!
-               1) lookup local states for a matching operatorID / checkpointID.
-               2) if nothing available: look into job manager provided state.
-               3) massage it into a snapshots and return stuff.
-                */
+               if (localStateSnapshot != null) {
+                       OperatorSubtaskState localSubtaskState = 
localStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+
+                       if (localSubtaskState != null) {
+                               PrioritizedOperatorSubtaskState.Builder builder 
= new PrioritizedOperatorSubtaskState.Builder(
+                                       jobManagerSubtaskState,
+                                       
Collections.singletonList(localSubtaskState));
+                               return builder.build();
+                       }
+               }
+
+               PrioritizedOperatorSubtaskState.Builder builder = new 
PrioritizedOperatorSubtaskState.Builder(
+                       jobManagerSubtaskState,
+                       Collections.emptyList(),
+                       true);
+               return builder.build();
+       }
+
+       @Nonnull
+       @Override
+       public LocalRecoveryConfig createLocalRecoveryConfig() {
+               return localStateStore.getLocalRecoveryConfig();
        }
 
        /**
@@ -111,6 +146,6 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
         */
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               //TODO activate and prune local state later
+               localStateStore.confirmCheckpoint(checkpointId);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
new file mode 100644
index 0000000..054a98c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link CheckpointStateOutputStream} that writes into a specified file and
+ * returns a {@link FileStateHandle} upon closing.
+ *
+ * <p>Unlike the {@link 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * this stream does not have a threshold below which it returns a memory byte 
stream handle,
+ * and does not create random files, but writes to a specified file.
+ */
+public final class FileBasedStateOutputStream extends 
CheckpointStateOutputStream {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedStateOutputStream.class);
+
+       // 
------------------------------------------------------------------------
+
+       private final FSDataOutputStream out;
+
+       private final Path path;
+
+       private final FileSystem fileSystem;
+
+       private volatile boolean closed;
+
+
+       public FileBasedStateOutputStream(FileSystem fileSystem, Path path) 
throws IOException {
+               this.fileSystem = checkNotNull(fileSystem);
+               this.path = checkNotNull(path);
+
+               this.out = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  I/O
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public final void write(int b) throws IOException {
+               out.write(b);
+       }
+
+       @Override
+       public final void write(@Nonnull byte[] b, int off, int len) throws 
IOException {
+               out.write(b, off, len);
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return out.getPos();
+       }
+
+       @Override
+       public void flush() throws IOException {
+               out.flush();
+       }
+
+       @Override
+       public void sync() throws IOException {
+               out.sync();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Closing
+       // 
------------------------------------------------------------------------
+
+       public boolean isClosed() {
+               return closed;
+       }
+
+       @Override
+       public void close() {
+               if (!closed) {
+                       closed = true;
+
+                       try {
+                               out.close();
+                               fileSystem.delete(path, false);
+                       }
+                       catch (Throwable t) {
+                               LOG.warn("Could not close the state stream for 
{}.", path, t);
+                       }
+               }
+       }
+
+       @Nullable
+       @Override
+       public FileStateHandle closeAndGetHandle() throws IOException {
+               synchronized (this) {
+                       if (!closed) {
+                               try {
+                                       // make a best effort attempt to figure 
out the size
+                                       long size = 0;
+                                       try {
+                                               size = out.getPos();
+                                       } catch (Exception ignored) {}
+
+                                       // close and return
+                                       out.close();
+
+                                       return new FileStateHandle(path, size);
+                               }
+                               catch (Exception e) {
+                                       try {
+                                               fileSystem.delete(path, false);
+                                       }
+                                       catch (Exception deleteException) {
+                                               LOG.warn("Could not delete the 
checkpoint stream file {}.", path, deleteException);
+                                       }
+
+                                       throw new IOException("Could not flush 
and close the file system " +
+                                               "output stream to " + path + " 
in order to obtain the " +
+                                               "stream state handle", e);
+                               }
+                               finally {
+                                       closed = true;
+                               }
+                       }
+                       else {
+                               throw new IOException("Stream has already been 
closed and discarded.");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index f993786..609ef69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
@@ -115,8 +117,10 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
        // 
------------------------------------------------------------------------
 
        @Override
-       public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
Exception {
-               Path target = scope == CheckpointedStateScope.EXCLUSIVE ? 
checkpointDirectory : sharedStateDirectory;
+       public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
IOException {
+
+
+               Path target = scope == CheckpointedStateScope.EXCLUSIVE 
?checkpointDirectory: sharedStateDirectory;
                int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
 
                return new FsCheckpointStateOutputStream(target, filesystem, 
bufferSize, fileStateThreshold);
@@ -275,6 +279,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                        }
                }
 
+               @Nullable
                @Override
                public StreamStateHandle closeAndGetHandle() throws IOException 
{
                        // check if there was nothing ever written

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 58791e2..637effd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -32,7 +32,9 @@ import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.util.TernaryBoolean;
 
@@ -102,7 +104,7 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
         * A value of 'undefined' means not yet configured, in which case the 
default will be used. */
        private final TernaryBoolean asynchronousSnapshots;
 
-       // 
------------------------------------------------------------------------
+       // 
-----------------------------------------------------------------------
 
        /**
         * Creates a new state backend that stores its checkpoint data in the 
file system and location
@@ -451,7 +453,10 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
+                       TaskKvStateRegistry kvStateRegistry) {
+
+               TaskStateManager taskStateManager = env.getTaskStateManager();
+               LocalRecoveryConfig localRecoveryConfig = 
taskStateManager.createLocalRecoveryConfig();
 
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
@@ -460,13 +465,14 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                                numberOfKeyGroups,
                                keyGroupRange,
                                isUsingAsynchronousSnapshots(),
-                               env.getExecutionConfig());
+                               env.getExecutionConfig(),
+                               localRecoveryConfig);
        }
 
        @Override
        public OperatorStateBackend createOperatorStateBackend(
                Environment env,
-               String operatorIdentifier) throws Exception {
+               String operatorIdentifier) {
 
                return new DefaultOperatorStateBackend(
                        env.getUserClassLoader(),

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aa7ea6e..5d5f716 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -49,8 +49,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
@@ -117,13 +119,14 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final boolean asynchronousSnapshots;
 
        public HeapKeyedStateBackend(
-                       TaskKvStateRegistry kvStateRegistry,
-                       TypeSerializer<K> keySerializer,
-                       ClassLoader userCodeClassLoader,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       boolean asynchronousSnapshots,
-                       ExecutionConfig executionConfig) {
+               TaskKvStateRegistry kvStateRegistry,
+               TypeSerializer<K> keySerializer,
+               ClassLoader userCodeClassLoader,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               boolean asynchronousSnapshots,
+               ExecutionConfig executionConfig,
+               LocalRecoveryConfig localRecoveryConfig) {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
                this.asynchronousSnapshots = asynchronousSnapshots;
@@ -286,14 +289,14 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        @Override
        @SuppressWarnings("unchecked")
-       public  RunnableFuture<KeyedStateHandle> snapshot(
+       public  RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
                        final long checkpointId,
                        final long timestamp,
                        final CheckpointStreamFactory streamFactory,
                        CheckpointOptions checkpointOptions) throws Exception {
 
                if (!hasRegisteredState()) {
-                       return DoneFuture.nullValue();
+                       return DoneFuture.of(SnapshotResult.empty());
                }
 
                long syncStartTime = System.currentTimeMillis();
@@ -326,8 +329,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                //--------------------------------------------------- this 
becomes the end of sync part
 
                // implementation of the async IO operation, based on FutureTask
-               final AbstractAsyncCallableWithResources<KeyedStateHandle> 
ioCallable =
-                       new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
+               final 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable 
=
+                       new 
AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
 
                                
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
 
@@ -359,7 +362,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                }
 
                                @Override
-                               public KeyGroupsStateHandle performOperation() 
throws Exception {
+                               public SnapshotResult<KeyedStateHandle> 
performOperation() throws Exception {
                                        long asyncStartTime = 
System.currentTimeMillis();
 
                                        
CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.stream;
@@ -401,15 +404,15 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                        final 
KeyGroupsStateHandle keyGroupsStateHandle =
                                                                new 
KeyGroupsStateHandle(offsets, streamStateHandle);
 
-                                                       return 
keyGroupsStateHandle;
+                                                       return 
SnapshotResult.of(keyGroupsStateHandle);
                                                }
                                        }
 
-                                       return null;
+                                       return SnapshotResult.empty();
                                }
                        };
 
-               AsyncStoppableTaskWithCallback<KeyedStateHandle> task = 
AsyncStoppableTaskWithCallback.from(ioCallable);
+               
AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = 
AsyncStoppableTaskWithCallback.from(ioCallable);
 
                if (!asynchronousSnapshots) {
                        task.run();

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 168e4ff..0801429 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,7 +49,8 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
 
        @Override
        public CheckpointStateOutputStream createCheckpointStateOutputStream(
-                       CheckpointedStateScope scope) throws Exception {
+                       CheckpointedStateScope scope) throws IOException
+       {
                return new MemoryCheckpointOutputStream(maxStateSize);
        }
 
@@ -114,6 +117,7 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
                        }
                }
 
+               @Nullable
                @Override
                public StreamStateHandle closeAndGetHandle() throws IOException 
{
                        if (isEmpty) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 88d7b01..3da60e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.util.TernaryBoolean;
@@ -299,13 +300,16 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
 
        @Override
        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-                       Environment env, JobID jobID,
+                       Environment env,
+                       JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry) {
 
+               TaskStateManager taskStateManager = env.getTaskStateManager();
+
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
                                keySerializer,
@@ -313,7 +317,8 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
                                numberOfKeyGroups,
                                keyGroupRange,
                                isUsingAsynchronousSnapshots(),
-                               env.getExecutionConfig());
+                               env.getExecutionConfig(),
+                               taskStateManager.createLocalRecoveryConfig());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f4c953d..927bd11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -205,7 +205,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                this.jobManagerTable = 
taskExecutorServices.getJobManagerTable();
                this.jobLeaderService = 
taskExecutorServices.getJobLeaderService();
                this.taskManagerLocation = 
taskExecutorServices.getTaskManagerLocation();
-               this.localStateStoresManager = 
taskExecutorServices.getTaskStateManager();
+               this.localStateStoresManager = 
taskExecutorServices.getTaskManagerStateStore();
                this.networkEnvironment = 
taskExecutorServices.getNetworkEnvironment();
 
                this.jobManagerConnections = new HashMap<>(4);
@@ -452,8 +452,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
                        PartitionProducerStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
 
-                       final TaskLocalStateStore localStateStore = 
localStateStoresManager.localStateStoreForTask(
+                       final TaskLocalStateStore localStateStore = 
localStateStoresManager.localStateStoreForSubtask(
                                jobId,
+                               tdd.getAllocationId(),
                                taskInformation.getJobVertexId(),
                                tdd.getSubtaskIndex());
 
@@ -744,6 +745,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                                
onFatalError(slotNotFoundException);
                                        }
 
+                                       // release local state under the 
allocation id.
+                                       
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
+
                                        // sanity check
                                        if 
(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                                                onFatalError(new 
Exception("Could not free slot " + slotId));
@@ -1271,6 +1275,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                } catch (SlotNotFoundException e) {
                        log.debug("Could not free slot for allocation id {}.", 
allocationId, e);
                }
+
+               
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
        }
 
        private void timeoutSlot(AllocationID allocationId, UUID ticket) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 2de1be8..08335b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -303,6 +303,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
                        taskManagerServicesConfiguration,
                        resourceID,
+                       rpcService.getExecutor(), // TODO replace this later 
with some dedicated executor for io.
                        
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
                        EnvironmentInformation.getMaxJvmHeapMemory());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index b710b6a..32c7ff7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -58,6 +60,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -68,6 +71,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 public class TaskManagerServices {
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServices.class);
 
+       @VisibleForTesting
+       public static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = 
"localState";
+
        /** TaskManager services. */
        private final TaskManagerLocation taskManagerLocation;
        private final MemoryManager memoryManager;
@@ -78,7 +84,7 @@ public class TaskManagerServices {
        private final TaskSlotTable taskSlotTable;
        private final JobManagerTable jobManagerTable;
        private final JobLeaderService jobLeaderService;
-       private final TaskExecutorLocalStateStoresManager taskStateManager;
+       private final TaskExecutorLocalStateStoresManager taskManagerStateStore;
 
        TaskManagerServices(
                TaskManagerLocation taskManagerLocation,
@@ -90,7 +96,7 @@ public class TaskManagerServices {
                TaskSlotTable taskSlotTable,
                JobManagerTable jobManagerTable,
                JobLeaderService jobLeaderService,
-               TaskExecutorLocalStateStoresManager taskStateManager) {
+               TaskExecutorLocalStateStoresManager taskManagerStateStore) {
 
                this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
@@ -101,7 +107,7 @@ public class TaskManagerServices {
                this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
                this.jobManagerTable = 
Preconditions.checkNotNull(jobManagerTable);
                this.jobLeaderService = 
Preconditions.checkNotNull(jobLeaderService);
-               this.taskStateManager = 
Preconditions.checkNotNull(taskStateManager);
+               this.taskManagerStateStore = 
Preconditions.checkNotNull(taskManagerStateStore);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -144,8 +150,8 @@ public class TaskManagerServices {
                return jobLeaderService;
        }
 
-       public TaskExecutorLocalStateStoresManager getTaskStateManager() {
-               return taskStateManager;
+       public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
+               return taskManagerStateStore;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -160,6 +166,12 @@ public class TaskManagerServices {
                Exception exception = null;
 
                try {
+                       taskManagerStateStore.shutdown();
+               } catch (Exception e) {
+                       exception = e;
+               }
+
+               try {
                        memoryManager.shutdown();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
@@ -209,6 +221,7 @@ public class TaskManagerServices {
         *
         * @param resourceID resource ID of the task manager
         * @param taskManagerServicesConfiguration task manager configuration
+        * @param taskIOExecutor executor for async IO operations.
         * @param freeHeapMemoryWithDefrag an estimate of the size of the free 
heap memory
         * @param maxJvmHeapMemory the maximum JVM heap size
         * @return task manager components
@@ -217,6 +230,7 @@ public class TaskManagerServices {
        public static TaskManagerServices fromConfiguration(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
                        ResourceID resourceID,
+                       Executor taskIOExecutor,
                        long freeHeapMemoryWithDefrag,
                        long maxJvmHeapMemory) throws Exception {
 
@@ -256,7 +270,20 @@ public class TaskManagerServices {
                final JobManagerTable jobManagerTable = new JobManagerTable();
 
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
-               final TaskExecutorLocalStateStoresManager taskStateManager = 
new TaskExecutorLocalStateStoresManager();
+
+               LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode = 
taskManagerServicesConfiguration.getLocalRecoveryMode();
+
+               final String[] stateRootDirectoryStrings = 
taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
+
+               final File[] stateRootDirectoryFiles = new 
File[stateRootDirectoryStrings.length];
+
+               for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
+                       stateRootDirectoryFiles[i] = new 
File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
+               }
+
+               final TaskExecutorLocalStateStoresManager taskStateManager =
+                       new 
TaskExecutorLocalStateStoresManager(localRecoveryMode, stateRootDirectoryFiles, 
taskIOExecutor);
+
                return new TaskManagerServices(
                        taskManagerLocation,
                        memoryManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 07cf660..d029bc5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.NetUtils;
@@ -54,6 +55,8 @@ public class TaskManagerServicesConfiguration {
 
        private final String[] tmpDirPaths;
 
+       private final String[] localRecoveryStateRootDirectories;
+
        private final int numberOfSlots;
 
        private final NetworkEnvironmentConfiguration networkConfig;
@@ -75,9 +78,13 @@ public class TaskManagerServicesConfiguration {
 
        private final long timerServiceShutdownTimeout;
 
+       private final LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode;
+
        public TaskManagerServicesConfiguration(
                        InetAddress taskManagerAddress,
                        String[] tmpDirPaths,
+                       String[] localRecoveryStateRootDirectories,
+                       LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode,
                        NetworkEnvironmentConfiguration networkConfig,
                        QueryableStateConfiguration queryableStateConfig,
                        int numberOfSlots,
@@ -89,6 +96,8 @@ public class TaskManagerServicesConfiguration {
 
                this.taskManagerAddress = checkNotNull(taskManagerAddress);
                this.tmpDirPaths = checkNotNull(tmpDirPaths);
+               this.localRecoveryStateRootDirectories = 
checkNotNull(localRecoveryStateRootDirectories);
+               this.localRecoveryMode = checkNotNull(localRecoveryMode);
                this.networkConfig = checkNotNull(networkConfig);
                this.queryableStateConfig = checkNotNull(queryableStateConfig);
                this.numberOfSlots = checkNotNull(numberOfSlots);
@@ -115,6 +124,14 @@ public class TaskManagerServicesConfiguration {
                return tmpDirPaths;
        }
 
+       public String[] getLocalRecoveryStateRootDirectories() {
+               return localRecoveryStateRootDirectories;
+       }
+
+       public LocalRecoveryConfig.LocalRecoveryMode getLocalRecoveryMode() {
+               return localRecoveryMode;
+       }
+
        public NetworkEnvironmentConfiguration getNetworkConfig() {
                return networkConfig;
        }
@@ -185,6 +202,15 @@ public class TaskManagerServicesConfiguration {
                }
 
                final String[] tmpDirs = 
ConfigurationUtils.parseTempDirectories(configuration);
+               String[] localStateRootDir = 
ConfigurationUtils.parseLocalStateDirectories(configuration);
+
+               if (localStateRootDir.length == 0) {
+                       // default to temp dirs.
+                       localStateRootDir = tmpDirs;
+               }
+
+               LocalRecoveryConfig.LocalRecoveryMode localRecoveryMode =
+                       
LocalRecoveryConfig.LocalRecoveryMode.fromConfig(configuration);
 
                final NetworkEnvironmentConfiguration networkConfig = 
parseNetworkEnvironmentConfiguration(
                        configuration,
@@ -225,6 +251,8 @@ public class TaskManagerServicesConfiguration {
                return new TaskManagerServicesConfiguration(
                        remoteAddress,
                        tmpDirs,
+                       localStateRootDir,
+                       localRecoveryMode,
                        networkConfig,
                        queryableStateConfig,
                        slots,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c22413e..1ecb47a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1202,25 +1202,24 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
 
                if (executionState == ExecutionState.RUNNING && invokable != 
null) {
 
-                               Runnable runnable = new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       
invokable.notifyCheckpointComplete(checkpointID);
-                                               
taskStateManager.notifyCheckpointComplete(checkpointID);}
-                                               catch (Throwable t) {
-                                                       if (getExecutionState() 
== ExecutionState.RUNNING) {
-                                                               // fail task if 
checkpoint confirmation failed.
-                                                               
failExternally(new RuntimeException(
-                                                                       "Error 
while confirming checkpoint",
-                                                                       t));
-                                                       }
+                       Runnable runnable = new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
invokable.notifyCheckpointComplete(checkpointID);
+                                               
taskStateManager.notifyCheckpointComplete(checkpointID);
+                                       } catch (Throwable t) {
+                                               if (getExecutionState() == 
ExecutionState.RUNNING) {
+                                                       // fail task if 
checkpoint confirmation failed.
+                                                       failExternally(new 
RuntimeException(
+                                                               "Error while 
confirming checkpoint",
+                                                               t));
                                                }
                                        }
-                               };
-                               executeAsyncCallRunnable(runnable, "Checkpoint 
Confirmation for " +
-                                               taskNameWithSubtask);
-
+                               }
+                       };
+                       executeAsyncCallRunnable(runnable, "Checkpoint 
Confirmation for " +
+                               taskNameWithSubtask);
                }
                else {
                        LOG.debug("Ignoring checkpoint commit notification for 
non-running task {}.", taskNameWithSubtask);

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 3173948..f62ef1b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, 
StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, 
TaskManagerMetricGroup}
 import org.apache.flink.runtime.metrics.util.MetricUtils
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
@@ -238,6 +239,7 @@ class LocalFlinkMiniCluster(
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
       resourceID,
+      ioExecutor,
       EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag,
       EnvironmentInformation.getMaxJvmHeapMemory)
 
@@ -254,6 +256,7 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
+      taskManagerServices.getTaskManagerStateStore,
       taskManagerMetricGroup)
 
     system.actorOf(props, taskManagerActorName)
@@ -318,6 +321,7 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
+    taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     taskManagerMetricGroup: TaskManagerMetricGroup): Props = {
 
     TaskManager.getTaskManagerProps(
@@ -328,6 +332,7 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       networkEnvironment,
+      taskManagerLocalStateStoresManager,
       highAvailabilityServices,
       taskManagerMetricGroup)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 485add5..15581b2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -35,11 +35,11 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, 
QuarantineMonitor}
-import org.apache.flink.runtime.blob.{BlobCacheService, BlobClient, 
BlobService}
+import org.apache.flink.runtime.blob.BlobCacheService
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
-import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.clusterframework.types.{AllocationID, 
ResourceID}
 import org.apache.flink.runtime.concurrent.{Executors, FutureUtils}
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
 import org.apache.flink.runtime.execution.ExecutionState
@@ -126,6 +126,7 @@ class TaskManager(
     protected val memoryManager: MemoryManager,
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
+    protected val taskManagerLocalStateStoresManager: 
TaskExecutorLocalStateStoresManager,
     protected val numberOfSlots: Int,
     protected val highAvailabilityServices: HighAvailabilityServices,
     protected val taskManagerMetricGroup: TaskManagerMetricGroup)
@@ -253,6 +254,12 @@ class TaskManager(
     }
 
     try {
+      taskManagerLocalStateStoresManager.shutdown()
+    } catch {
+      case t: Exception => log.error("Task state manager did not shutdown 
properly.", t)
+    }
+
+    try {
       fileCache.shutdown()
     } catch {
       case t: Exception => log.error("FileCache did not shutdown properly.", t)
@@ -474,7 +481,7 @@ class TaskManager(
             log.debug(s"Cannot find task to stop for execution 
${executionID})")
             sender ! decorateMessage(Acknowledge.get())
           }
- 
+
         // cancels a task
         case CancelTask(executionID) =>
           val task = runningTasks.get(executionID)
@@ -984,7 +991,7 @@ class TaskManager(
         log.error(message, e)
         throw new RuntimeException(message, e)
     }
-    
+
     // watch job manager to detect when it dies
     context.watch(jobManager)
 
@@ -1070,7 +1077,7 @@ class TaskManager(
       // clear the key-value location oracle
       
proxy.updateKvStateLocationOracle(HighAvailabilityServices.DEFAULT_JOB_ID, null)
     }
-    
+
     // failsafe shutdown of the metrics registry
     try {
       taskManagerMetricGroup.close()
@@ -1195,18 +1202,21 @@ class TaskManager(
           config.getTimeout().getSize(),
           config.getTimeout().getUnit()))
 
-      // TODO: wire this so that the manager survives the end of the task
-      val taskExecutorLocalStateStoresManager = new 
TaskExecutorLocalStateStoresManager
+      val jobID = jobInformation.getJobId
 
-      val localStateStore = 
taskExecutorLocalStateStoresManager.localStateStoreForTask(
-        jobInformation.getJobId,
+      // Allocation ids do not work properly without flip-6, so we just fake 
one, based on the jid.
+      val fakeAllocationID = new AllocationID(jobID.getLowerPart, 
jobID.getUpperPart)
+
+      val taskLocalStateStore = 
taskManagerLocalStateStoresManager.localStateStoreForSubtask(
+        jobID,
+        fakeAllocationID,
         taskInformation.getJobVertexId,
         tdd.getSubtaskIndex)
 
-      val slotStateManager = new TaskStateManagerImpl(
-        jobInformation.getJobId,
+      val taskStateManager = new TaskStateManagerImpl(
+        jobID,
         tdd.getExecutionAttemptId,
-        localStateStore,
+        taskLocalStateStore,
         tdd.getTaskRestore,
         checkpointResponder)
 
@@ -1224,7 +1234,7 @@ class TaskManager(
         ioManager,
         network,
         bcVarManager,
-        slotStateManager,
+        taskStateManager,
         taskManagerConnection,
         inputSplitProvider,
         checkpointResponder,
@@ -2013,6 +2023,7 @@ object TaskManager {
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
       resourceID,
+      actorSystem.dispatcher,
       EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag,
       EnvironmentInformation.getMaxJvmHeapMemory)
 
@@ -2030,6 +2041,7 @@ object TaskManager {
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
+      taskManagerServices.getTaskManagerStateStore(),
       highAvailabilityServices,
       taskManagerMetricGroup)
 
@@ -2047,6 +2059,7 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
+    taskStateManager: TaskExecutorLocalStateStoresManager,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup: TaskManagerMetricGroup
   ): Props = {
@@ -2058,6 +2071,7 @@ object TaskManager {
       memoryManager,
       ioManager,
       networkEnvironment,
+      taskStateManager,
       taskManagerConfig.getNumberSlots(),
       highAvailabilityServices,
       taskManagerMetricGroup)

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index cbfe0ed..32b32cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -25,8 +25,9 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.util.TestLogger;
@@ -93,8 +94,8 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
 
                KeyedStateHandle managedKeyedHandle = 
mock(KeyedStateHandle.class);
                KeyedStateHandle rawKeyedHandle = mock(KeyedStateHandle.class);
-               OperatorStateHandle managedOpHandle = 
mock(OperatorStateHandle.class);
-               OperatorStateHandle rawOpHandle = 
mock(OperatorStateHandle.class);
+               OperatorStateHandle managedOpHandle = 
mock(OperatorStreamStateHandle.class);
+               OperatorStateHandle rawOpHandle = 
mock(OperatorStreamStateHandle.class);
 
                final OperatorSubtaskState operatorSubtaskState = spy(new 
OperatorSubtaskState(
                        managedOpHandle,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c791fd8..1b2062a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -52,7 +53,6 @@ import 
org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
-import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -62,7 +62,6 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
@@ -2739,7 +2738,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                metaInfoMap.put("t-6", new 
OperatorStateHandle.StateMetaInfo(new long[]{121, 143, 147}, 
OperatorStateHandle.Mode.BROADCAST));
 
                // this is what a single task will return
-               OperatorStateHandle osh = new OperatorStateHandle(metaInfoMap, 
new ByteStreamStateHandle("test", new byte[150]));
+               OperatorStateHandle osh = new 
OperatorStreamStateHandle(metaInfoMap, new ByteStreamStateHandle("test", new 
byte[150]));
 
                OperatorStateRepartitioner repartitioner = 
RoundRobinOperatorStateRepartitioner.INSTANCE;
                List<Collection<OperatorStateHandle>> repartitionedStates =
@@ -2817,7 +2816,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0));
 
-               ByteStreamStateHandle allSerializedStatesHandle = new 
TestByteStreamStateHandleDeepCompare(
+               ByteStreamStateHandle allSerializedStatesHandle = new 
ByteStreamStateHandle(
                                String.valueOf(UUID.randomUUID()),
                                serializedDataWithOffsets.f0);
 
@@ -2936,11 +2935,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        ++idx;
                }
 
-               ByteStreamStateHandle streamStateHandle = new 
TestByteStreamStateHandleDeepCompare(
+               ByteStreamStateHandle streamStateHandle = new 
ByteStreamStateHandle(
                        String.valueOf(UUID.randomUUID()),
                        serializationWithOffsets.f0);
 
-               return new OperatorStateHandle(offsetsMap, streamStateHandle);
+               return new OperatorStreamStateHandle(offsetsMap, 
streamStateHandle);
        }
 
        static ExecutionJobVertex mockExecutionJobVertex(
@@ -3265,7 +3264,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                }
 
                                OperatorStateHandle.Mode mode = r.nextInt(10) 
== 0 ?
-                                               OperatorStateHandle.Mode.UNION 
: OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+                                       OperatorStateHandle.Mode.UNION : 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
                                namedStatesToOffsets.put(
                                                "State-" + s,
                                                new 
OperatorStateHandle.StateMetaInfo(offs, mode));
@@ -3282,7 +3281,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        }
 
                        previousParallelOpInstanceStates.add(
-                                       new 
OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
+                                       new 
OperatorStreamStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, 
-1)));
                }
 
                Map<StreamStateHandle, Map<String, List<Long>>> expected = new 
HashMap<>();
@@ -3769,10 +3768,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                        OperatorSubtaskState operatorSubtaskState =
                                spy(new OperatorSubtaskState(
-                                       
Collections.<OperatorStateHandle>emptyList(),
-                                       
Collections.<OperatorStateHandle>emptyList(),
-                                       
Collections.<KeyedStateHandle>singletonList(managedState),
-                                       
Collections.<KeyedStateHandle>emptyList()));
+                                       StateObjectCollection.empty(),
+                                       StateObjectCollection.empty(),
+                                       
StateObjectCollection.singleton(managedState),
+                                       StateObjectCollection.empty()));
 
                        Map<OperatorID, OperatorSubtaskState> opStates = new 
HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 70794c6..ff787ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
@@ -69,7 +69,7 @@ public class CheckpointMetadataLoadingTest {
                OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
 
                OperatorSubtaskState subtaskState = new OperatorSubtaskState(
-                               new OperatorStateHandle(
+                               new OperatorStreamStateHandle(
                                Collections.emptyMap(),
                                new ByteStreamStateHandle("testHandler", new 
byte[0])),
                                null,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index ab353a9..af6ec71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
@@ -123,10 +122,10 @@ public class CheckpointStateRestoreTest {
                        subtaskStates.putSubtaskStateByOperatorID(
                                OperatorID.fromJobVertexID(statefulId),
                                new OperatorSubtaskState(
-                                       
Collections.<OperatorStateHandle>emptyList(),
-                                       
Collections.<OperatorStateHandle>emptyList(),
-                                       
Collections.singletonList(serializedKeyGroupStates),
-                                       
Collections.<KeyedStateHandle>emptyList()));
+                                       StateObjectCollection.empty(),
+                                       StateObjectCollection.empty(),
+                                       
StateObjectCollection.singleton(serializedKeyGroupStates),
+                                       StateObjectCollection.empty()));
 
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates));
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, new 
CheckpointMetrics(), subtaskStates));

Reply via email to