http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 4898292..123cec2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -51,6 +51,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.stream.Stream;
 
@@ -62,8 +63,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @param <K> Type of the key by which state is keyed.
  */
-public abstract class AbstractKeyedStateBackend<K>
-               implements KeyedStateBackend<K>, 
Snapshotable<KeyedStateHandle>, Closeable, CheckpointListener {
+public abstract class AbstractKeyedStateBackend<K> implements
+       KeyedStateBackend<K>,
+       Snapshotable<SnapshotResult<KeyedStateHandle>, 
Collection<KeyedStateHandle>>,
+       Closeable,
+       CheckpointListener {
 
        /** {@link TypeSerializer} for our key. */
        protected final TypeSerializer<K> keySerializer;
@@ -110,7 +114,7 @@ public abstract class AbstractKeyedStateBackend<K>
                KeyGroupRange keyGroupRange,
                ExecutionConfig executionConfig) {
 
-               this.kvStateRegistry = kvStateRegistry; 
//Preconditions.checkNotNull(kvStateRegistry);
+               this.kvStateRegistry = kvStateRegistry;
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.numberOfKeyGroups = 
Preconditions.checkNotNull(numberOfKeyGroups);
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index e8997cd..3c6794e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -38,9 +40,9 @@ public interface CheckpointStreamFactory {
         * @param scope The state's scope, whether it is exclusive or shared.
         * @return An output stream that writes state for the given checkpoint.
         *
-        * @throws Exception Exceptions may occur while creating the stream and 
should be forwarded.
+        * @throws IOException Exceptions may occur while creating the stream 
and should be forwarded.
         */
-       CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
Exception;
+       CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
IOException;
 
        /**
         * A dedicated output stream that produces a {@link StreamStateHandle} 
when closed.
@@ -69,6 +71,7 @@ public interface CheckpointStreamFactory {
                 * @return A state handle that can create an input stream 
producing the data written to this stream.
                 * @throws IOException Thrown, if the stream cannot be closed.
                 */
+               @Nullable
                public abstract StreamStateHandle closeAndGetHandle() throws 
IOException;
 
                /**

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
new file mode 100644
index 0000000..41c35d4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.filesystem.FileBasedStateOutputStream;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Interface that provides access to a CheckpointStateOutputStream and a 
method to provide the {@link SnapshotResult}.
+ * This abstracts from different ways that a result is obtained from 
checkpoint output streams.
+ */
+public interface CheckpointStreamWithResultProvider extends Closeable {
+
+       Logger LOG = 
LoggerFactory.getLogger(CheckpointStreamWithResultProvider.class);
+
+       /**
+        * Closes the stream ans returns a snapshot result with the stream 
handle(s).
+        */
+       @Nonnull
+       SnapshotResult<StreamStateHandle> 
closeAndFinalizeCheckpointStreamResult() throws IOException;
+
+       /**
+        * Returns the encapsulated output stream.
+        */
+       @Nonnull
+       CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream();
+
+       @Override
+       default void close() throws IOException {
+               getCheckpointOutputStream().close();
+       }
+
+       /**
+        * Implementation of {@link CheckpointStreamWithResultProvider} that 
only creates the
+        * primary/remote/jm-owned state.
+        */
+       class PrimaryStreamOnly implements CheckpointStreamWithResultProvider {
+
+               @Nonnull
+               private final 
CheckpointStreamFactory.CheckpointStateOutputStream outputStream;
+
+               public PrimaryStreamOnly(@Nonnull 
CheckpointStreamFactory.CheckpointStateOutputStream outputStream) {
+                       this.outputStream = outputStream;
+               }
+
+               @Nonnull
+               @Override
+               public SnapshotResult<StreamStateHandle> 
closeAndFinalizeCheckpointStreamResult() throws IOException {
+                       return 
SnapshotResult.of(outputStream.closeAndGetHandle());
+               }
+
+               @Nonnull
+               @Override
+               public CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream() {
+                       return outputStream;
+               }
+       }
+
+       /**
+        * Implementation of {@link CheckpointStreamWithResultProvider} that 
creates both, the
+        * primary/remote/jm-owned state and the secondary/local/tm-owned state.
+        */
+       class PrimaryAndSecondaryStream implements 
CheckpointStreamWithResultProvider {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(PrimaryAndSecondaryStream.class);
+
+               @Nonnull
+               private final DuplicatingCheckpointOutputStream outputStream;
+
+               public PrimaryAndSecondaryStream(
+                       @Nonnull 
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut,
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOut) throws IOException {
+                       this(new DuplicatingCheckpointOutputStream(primaryOut, 
secondaryOut));
+               }
+
+               PrimaryAndSecondaryStream(@Nonnull 
DuplicatingCheckpointOutputStream outputStream) {
+                       this.outputStream = outputStream;
+               }
+
+               @Nonnull
+               @Override
+               public SnapshotResult<StreamStateHandle> 
closeAndFinalizeCheckpointStreamResult() throws IOException {
+
+                       final StreamStateHandle primaryStreamStateHandle;
+
+                       try {
+                               primaryStreamStateHandle = 
outputStream.closeAndGetPrimaryHandle();
+                       } catch (IOException primaryEx) {
+                               try {
+                                       outputStream.close();
+                               } catch (IOException closeEx) {
+                                       primaryEx = 
ExceptionUtils.firstOrSuppressed(closeEx, primaryEx);
+                               }
+                               throw primaryEx;
+                       }
+
+                       StreamStateHandle secondaryStreamStateHandle = null;
+
+                       try {
+                               secondaryStreamStateHandle = 
outputStream.closeAndGetSecondaryHandle();
+                       } catch (IOException secondaryEx) {
+                               LOG.warn("Exception from secondary/local 
checkpoint stream.", secondaryEx);
+                       }
+
+                       if (primaryStreamStateHandle != null) {
+                               if (secondaryStreamStateHandle != null) {
+                                       return 
SnapshotResult.withLocalState(primaryStreamStateHandle, 
secondaryStreamStateHandle);
+                               } else {
+                                       return 
SnapshotResult.of(primaryStreamStateHandle);
+                               }
+                       } else {
+                               return SnapshotResult.empty();
+                       }
+               }
+
+               @Nonnull
+               @Override
+               public DuplicatingCheckpointOutputStream 
getCheckpointOutputStream() {
+                       return outputStream;
+               }
+       }
+
+       @Nonnull
+       static CheckpointStreamWithResultProvider createSimpleStream(
+               @Nonnull CheckpointedStateScope checkpointedStateScope,
+               @Nonnull CheckpointStreamFactory primaryStreamFactory) throws 
IOException {
+
+               CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+                       
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
+
+               return new 
CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
+       }
+
+       @Nonnull
+       static CheckpointStreamWithResultProvider createDuplicatingStream(
+               @Nonnegative long checkpointId,
+               @Nonnull CheckpointedStateScope checkpointedStateScope,
+               @Nonnull CheckpointStreamFactory primaryStreamFactory,
+               @Nonnull LocalRecoveryDirectoryProvider 
secondaryStreamDirProvider) throws IOException {
+
+               CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
+                       
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
+
+               try {
+                       File outFile = new File(
+                               
secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),
+                               String.valueOf(UUID.randomUUID()));
+                       Path outPath = new Path(outFile.toURI());
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOut =
+                               new 
FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
+
+                       return new 
CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, 
secondaryOut);
+               } catch (IOException secondaryEx) {
+                       LOG.warn("Exception when opening secondary/local 
checkpoint output stream. " +
+                               "Continue only with the primary stream.", 
secondaryEx);
+               }
+
+               return new 
CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
+       }
+
+
+       /**
+        * Helper method that takes a {@link SnapshotResult<StreamStateHandle>} 
and a {@link KeyGroupRangeOffsets} and
+        * creates a {@link SnapshotResult<KeyGroupsStateHandle>} by combining 
the key groups offsets with all the
+        * present stream state handles.
+        */
+       @Nonnull
+       static SnapshotResult<KeyedStateHandle> 
toKeyedStateHandleSnapshotResult(
+               @Nonnull SnapshotResult<StreamStateHandle> snapshotResult,
+               @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) {
+
+               StreamStateHandle jobManagerOwnedSnapshot = 
snapshotResult.getJobManagerOwnedSnapshot();
+
+               if (jobManagerOwnedSnapshot != null) {
+
+                       KeyedStateHandle jmKeyedState = new 
KeyGroupsStateHandle(keyGroupRangeOffsets, jobManagerOwnedSnapshot);
+                       StreamStateHandle taskLocalSnapshot = 
snapshotResult.getTaskLocalSnapshot();
+
+                       if (taskLocalSnapshot != null) {
+
+                               KeyedStateHandle localKeyedState = new 
KeyGroupsStateHandle(keyGroupRangeOffsets, taskLocalSnapshot);
+                               return 
SnapshotResult.withLocalState(jmKeyedState, localKeyedState);
+                       } else {
+
+                               return SnapshotResult.of(jmKeyedState);
+                       }
+               } else {
+
+                       return SnapshotResult.empty();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 266483f..01a397a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -20,10 +20,10 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -45,6 +45,8 @@ import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -133,7 +135,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        public DefaultOperatorStateBackend(
                ClassLoader userClassLoader,
                ExecutionConfig executionConfig,
-               boolean asynchronousSnapshots) throws IOException {
+               boolean asynchronousSnapshots) {
 
                this.closeStreamOnCancelRegistry = new CloseableRegistry();
                this.userClassloader = 
Preconditions.checkNotNull(userClassLoader);
@@ -299,7 +301,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
        // 
-------------------------------------------------------------------------------------------
 
        @Override
-       public RunnableFuture<OperatorStateHandle> snapshot(
+       public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
                        final long checkpointId,
                        final long timestamp,
                        final CheckpointStreamFactory streamFactory,
@@ -308,7 +310,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                final long syncStartTime = System.currentTimeMillis();
 
                if (registeredOperatorStates.isEmpty() && 
registeredBroadcastStates.isEmpty()) {
-                       return DoneFuture.nullValue();
+                       return DoneFuture.of(SnapshotResult.empty());
                }
 
                final Map<String, PartitionableListState<?>> 
registeredOperatorStatesDeepCopies =
@@ -346,8 +348,8 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                }
 
                // implementation of the async IO operation, based on FutureTask
-               final AbstractAsyncCallableWithResources<OperatorStateHandle> 
ioCallable =
-                       new 
AbstractAsyncCallableWithResources<OperatorStateHandle>() {
+               final 
AbstractAsyncCallableWithResources<SnapshotResult<OperatorStateHandle>> 
ioCallable =
+                       new 
AbstractAsyncCallableWithResources<SnapshotResult<OperatorStateHandle>>() {
 
                                
CheckpointStreamFactory.CheckpointStateOutputStream out = null;
 
@@ -357,12 +359,12 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                }
 
                                @Override
-                               protected void releaseResources() throws 
Exception {
+                               protected void releaseResources() {
                                        closeOutStream();
                                }
 
                                @Override
-                               protected void stopOperation() throws Exception 
{
+                               protected void stopOperation() {
                                        closeOutStream();
                                }
 
@@ -377,8 +379,9 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                        }
                                }
 
+                               @Nonnull
                                @Override
-                               public OperatorStateHandle performOperation() 
throws Exception {
+                               public SnapshotResult<OperatorStateHandle> 
performOperation() throws Exception {
                                        long asyncStartTime = 
System.currentTimeMillis();
 
                                        
CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
@@ -444,7 +447,7 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                StreamStateHandle stateHandle = 
out.closeAndGetHandle();
 
                                                if (stateHandle != null) {
-                                                       retValue = new 
OperatorStateHandle(writtenStatesMetaData, stateHandle);
+                                                       retValue = new 
OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                                                }
                                        }
 
@@ -453,11 +456,12 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                                                        streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
                                        }
 
-                                       return retValue;
+                                       return SnapshotResult.of(retValue);
                                }
                        };
 
-               AsyncStoppableTaskWithCallback<OperatorStateHandle> task = 
AsyncStoppableTaskWithCallback.from(ioCallable);
+               
AsyncStoppableTaskWithCallback<SnapshotResult<OperatorStateHandle>> task =
+                       AsyncStoppableTaskWithCallback.from(ioCallable);
 
                if (!asynchronousSnapshots) {
                        task.run();
@@ -469,10 +473,9 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                return task;
        }
 
-       @Override
        public void restore(Collection<OperatorStateHandle> restoreSnapshots) 
throws Exception {
 
-               if (null == restoreSnapshots) {
+               if (null == restoreSnapshots || restoreSnapshots.isEmpty()) {
                        return;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
new file mode 100644
index 0000000..fc74638
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryKeyedStateHandle.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class is a keyed state handle based on a directory. It combines a 
{@link DirectoryStateHandle} and a
+ * {@link KeyGroupRange}.
+ */
+public class DirectoryKeyedStateHandle implements KeyedStateHandle {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The directory state handle. */
+       @Nonnull
+       private final DirectoryStateHandle directoryStateHandle;
+
+       /** The key-group range. */
+       @Nonnull
+       private final KeyGroupRange keyGroupRange;
+
+       public DirectoryKeyedStateHandle(
+               @Nonnull DirectoryStateHandle directoryStateHandle,
+               @Nonnull KeyGroupRange keyGroupRange) {
+
+               this.directoryStateHandle = directoryStateHandle;
+               this.keyGroupRange = keyGroupRange;
+       }
+
+       @Nonnull
+       public DirectoryStateHandle getDirectoryStateHandle() {
+               return directoryStateHandle;
+       }
+
+       @Nonnull
+       @Override
+       public KeyGroupRange getKeyGroupRange() {
+               return keyGroupRange;
+       }
+
+       @Override
+       public void discardState() throws Exception {
+               directoryStateHandle.discardState();
+       }
+
+       @Override
+       public long getStateSize() {
+               return directoryStateHandle.getStateSize();
+       }
+
+       @Override
+       public KeyedStateHandle getIntersection(KeyGroupRange 
otherKeyGroupRange) {
+               return 
this.keyGroupRange.getIntersection(otherKeyGroupRange).getNumberOfKeyGroups() > 
0 ? this : null;
+       }
+
+       @Override
+       public void registerSharedStates(SharedStateRegistry stateRegistry) {
+               // Nothing to do, this is for local use only.
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               DirectoryKeyedStateHandle that = (DirectoryKeyedStateHandle) o;
+
+               if 
(!getDirectoryStateHandle().equals(that.getDirectoryStateHandle())) {
+                       return false;
+               }
+               return getKeyGroupRange().equals(that.getKeyGroupRange());
+       }
+
+       @Override
+       public int hashCode() {
+               int result = getDirectoryStateHandle().hashCode();
+               result = 31 * result + getKeyGroupRange().hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "DirectoryKeyedStateHandle{" +
+                       "directoryStateHandle=" + directoryStateHandle +
+                       ", keyGroupRange=" + keyGroupRange +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
new file mode 100644
index 0000000..2d08777
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * This state handle represents a directory. This class is, for example, used 
to represent the directory of RocksDB's
+ * native checkpoint directories for local recovery.
+ */
+public class DirectoryStateHandle implements StateObject {
+
+       /** Serial version. */
+       private static final long serialVersionUID = 1L;
+
+       /** The path that describes the directory. */
+       @Nonnull
+       private final Path directory;
+
+       public DirectoryStateHandle(@Nonnull Path directory) {
+               this.directory = directory;
+       }
+
+       @Override
+       public void discardState() throws IOException {
+               FileSystem fileSystem = directory.getFileSystem();
+               fileSystem.delete(directory, true);
+       }
+
+       @Override
+       public long getStateSize() {
+               // For now, we will not report any size, but in the future this 
could (if needed) return the total dir size.
+               return 0L; // unknown
+       }
+
+       @Nonnull
+       public Path getDirectory() {
+               return directory;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               DirectoryStateHandle that = (DirectoryStateHandle) o;
+
+               return directory.equals(that.directory);
+       }
+
+       @Override
+       public int hashCode() {
+               return directory.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return "DirectoryStateHandle{" +
+                       "directory=" + directory +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
index d2d808d..556212f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java
@@ -17,11 +17,10 @@
  */
 package org.apache.flink.runtime.state;
 
-import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * A {@link Future} that is always done and will just yield the object that 
was given at creation
@@ -31,11 +30,10 @@ import java.util.concurrent.TimeoutException;
  */
 public class DoneFuture<T> implements RunnableFuture<T> {
 
-       private static final DoneFuture<?> NULL_FUTURE = new 
DoneFuture<Object>(null);
-
+       @Nullable
        private final T payload;
 
-       public DoneFuture(T payload) {
+       protected DoneFuture(@Nullable T payload) {
                this.payload = payload;
        }
 
@@ -55,14 +53,14 @@ public class DoneFuture<T> implements RunnableFuture<T> {
        }
 
        @Override
-       public T get() throws InterruptedException, ExecutionException {
+       public T get() {
                return payload;
        }
 
        @Override
        public T get(
                        long timeout,
-                       TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+                       TimeUnit unit) {
                return get();
        }
 
@@ -71,8 +69,8 @@ public class DoneFuture<T> implements RunnableFuture<T> {
 
        }
 
-       @SuppressWarnings("unchecked")
-       public static <T> DoneFuture<T> nullValue() {
-               return (DoneFuture<T>) NULL_FUTURE;
+
+       public static <T> DoneFuture<T> of(@Nullable T result) {
+               return new DoneFuture<>(result);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
new file mode 100644
index 0000000..259900c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStream.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * A CheckpointStateOutputStream that wraps a primary and a secondary 
CheckpointStateOutputStream and duplicates
+ * all writes into both streams. This stream applies buffering to reduce the 
amount of dual-method calling. Furthermore,
+ * exceptions that happen in interactions with the secondary stream are not 
exposed, until the user calls
+ * {@link #closeAndGetSecondaryHandle()}. In contrast to that, exceptions from 
interactions with the primary stream
+ * are immediately returned to the user. This class is used to write state for 
local recovery as a local (secondary)
+ * copy of the (primary) state snapshot that is written to a (slower but 
highly-available) remote filesystem.
+ */
+public class DuplicatingCheckpointOutputStream extends 
CheckpointStreamFactory.CheckpointStateOutputStream {
+
+       /** Default buffer size of 8KB. */
+       private static final int DEFAULT_BUFFER_SIZER = 8 * 1024;
+
+       /** Write buffer. */
+       private final byte[] buffer;
+
+       /** Position in the write buffer. */
+       private int bufferIdx;
+
+       /**
+        * Primary stream for writing the checkpoint data. Failures from this 
stream are forwarded.
+        */
+       private final CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream;
+
+       /**
+        * Primary stream for writing the checkpoint data. Failures from this 
stream are not forwarded until
+        * {@link #closeAndGetSecondaryHandle()}.
+        */
+       private final CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream;
+
+       /**
+        * Stores a potential exception that occurred while interacting with 
{@link #secondaryOutputStream}
+        */
+       private Exception secondaryStreamException;
+
+       public DuplicatingCheckpointOutputStream(
+               CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+               CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream) throws IOException {
+               this(primaryOutputStream, secondaryOutputStream, 
DEFAULT_BUFFER_SIZER);
+       }
+
+       public DuplicatingCheckpointOutputStream(
+               CheckpointStreamFactory.CheckpointStateOutputStream 
primaryOutputStream,
+               CheckpointStreamFactory.CheckpointStateOutputStream 
secondaryOutputStream,
+               int bufferSize) throws IOException {
+
+               this.primaryOutputStream = 
Preconditions.checkNotNull(primaryOutputStream);
+               this.secondaryOutputStream = 
Preconditions.checkNotNull(secondaryOutputStream);
+
+               this.buffer = new byte[bufferSize];
+               this.bufferIdx = 0;
+
+               this.secondaryStreamException = null;
+
+               checkForAlignedStreamPositions();
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+
+               if (buffer.length <= bufferIdx) {
+                       flushInternalBuffer();
+               }
+
+               buffer[bufferIdx] = (byte) b;
+               ++bufferIdx;
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+
+               write(b, 0, b.length);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+
+               if (buffer.length <= len) {
+
+                       flushInternalBuffer();
+                       writeThroughInternal(b, off, len);
+               } else {
+
+                       if (buffer.length < len + bufferIdx) {
+                               flushInternalBuffer();
+                       }
+
+                       System.arraycopy(b, off, buffer, bufferIdx, len);
+                       bufferIdx += len;
+               }
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               final long referencePos = primaryOutputStream.getPos();
+               return referencePos + bufferIdx;
+       }
+
+       @Override
+       public void flush() throws IOException {
+
+               flushInternalBuffer();
+               primaryOutputStream.flush();
+
+               if (secondaryStreamException == null) {
+                       try {
+                               secondaryOutputStream.flush();
+                       } catch (Exception flushEx) {
+                               handleSecondaryStreamOnException(flushEx);
+                       }
+               }
+       }
+
+       @Override
+       public void sync() throws IOException {
+
+               flushInternalBuffer();
+               primaryOutputStream.sync();
+
+               if (secondaryStreamException == null) {
+                       try {
+                               secondaryOutputStream.sync();
+                       } catch (Exception syncEx) {
+                               handleSecondaryStreamOnException(syncEx);
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+
+               Exception exCollector = null;
+
+               try {
+                       flushInternalBuffer();
+               } catch (Exception flushEx) {
+                       exCollector = flushEx;
+               }
+
+               try {
+                       primaryOutputStream.close();
+               } catch (Exception closeEx) {
+                       exCollector = ExceptionUtils.firstOrSuppressed(closeEx, 
exCollector);
+               }
+
+               if (secondaryStreamException == null) {
+                       try {
+                               secondaryOutputStream.close();
+                       } catch (Exception closeEx) {
+                               handleSecondaryStreamOnException(closeEx);
+                       }
+               }
+
+               if (exCollector != null) {
+                       throw new IOException("Exception while closing 
duplicating stream.", exCollector);
+               }
+       }
+
+       private void checkForAlignedStreamPositions() throws IOException {
+
+               if (secondaryStreamException != null) {
+                       return;
+               }
+
+               final long primaryPos = primaryOutputStream.getPos();
+
+               try {
+                       final long secondaryPos = 
secondaryOutputStream.getPos();
+
+                       if (primaryPos != secondaryPos) {
+                               handleSecondaryStreamOnException(
+                                       new IOException("Stream positions are 
out of sync between primary stream and secondary stream. " +
+                                               "Reported positions are " + 
primaryPos + " (primary) and " + secondaryPos + " (secondary)."));
+                       }
+               } catch (Exception posEx) {
+                       handleSecondaryStreamOnException(posEx);
+               }
+       }
+
+       private void flushInternalBuffer() throws IOException {
+
+               if (bufferIdx > 0) {
+                       writeThroughInternal(buffer, 0, bufferIdx);
+                       bufferIdx = 0;
+               }
+       }
+
+       private void writeThroughInternal(byte[] b, int off, int len) throws 
IOException {
+
+               primaryOutputStream.write(b, off, len);
+
+               if (secondaryStreamException == null) {
+                       try {
+                               secondaryOutputStream.write(b, off, len);
+                       } catch (Exception writeEx) {
+                               handleSecondaryStreamOnException(writeEx);
+                       }
+               }
+       }
+
+       private void handleSecondaryStreamOnException(Exception ex) {
+
+               Preconditions.checkState(secondaryStreamException == null,
+                       "Secondary stream already failed from previous 
exception!");
+
+               try {
+                       secondaryOutputStream.close();
+               } catch (Exception closeEx) {
+                       ex = ExceptionUtils.firstOrSuppressed(closeEx, ex);
+               }
+
+               secondaryStreamException = Preconditions.checkNotNull(ex);
+       }
+
+       @Nullable
+       @Override
+       public StreamStateHandle closeAndGetHandle() throws IOException {
+               return closeAndGetPrimaryHandle();
+       }
+
+       /**
+        * Returns the state handle from the {@link #primaryOutputStream}.
+        */
+       public StreamStateHandle closeAndGetPrimaryHandle() throws IOException {
+               flushInternalBuffer();
+               return primaryOutputStream.closeAndGetHandle();
+       }
+
+       /**
+        * Returns the state handle from the {@link #secondaryOutputStream}. 
Also reports suppressed exceptions from earlier
+        * interactions with that stream.
+        */
+       public StreamStateHandle closeAndGetSecondaryHandle() throws 
IOException {
+               if (secondaryStreamException == null) {
+                       flushInternalBuffer();
+                       return secondaryOutputStream.closeAndGetHandle();
+               } else {
+                       throw new IOException("Secondary stream previously 
failed exceptionally", secondaryStreamException);
+               }
+       }
+
+       public Exception getSecondaryStreamException() {
+               return secondaryStreamException;
+       }
+
+       @VisibleForTesting
+       CheckpointStreamFactory.CheckpointStateOutputStream 
getPrimaryOutputStream() {
+               return primaryOutputStream;
+       }
+
+       @VisibleForTesting
+       CheckpointStreamFactory.CheckpointStateOutputStream 
getSecondaryOutputStream() {
+               return secondaryOutputStream;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
new file mode 100644
index 0000000..f80a8ce
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * State handle for local copies of {@link IncrementalKeyedStateHandle}. 
Consists of a {@link DirectoryStateHandle} that
+ * represents the directory of the native RocksDB snapshot, the key groups, 
and a stream state handle for Flink's state
+ * meta data file.
+ */
+public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Id of the checkpoint that created this state handle. */
+       @Nonnegative
+       private final long checkpointId;
+
+       /** UUID to identify the backend which created this state handle. */
+       @Nonnull
+       private final UUID backendIdentifier;
+
+       /** Handle to Flink's state meta data. */
+       @Nonnull
+       private final StreamStateHandle metaDataState;
+
+       /** Set with the ids of all shared state handles created by the 
checkpoint. */
+       @Nonnull
+       private final Set<StateHandleID> sharedStateHandleIDs;
+
+       public IncrementalLocalKeyedStateHandle(
+               @Nonnull UUID backendIdentifier,
+               @Nonnegative long checkpointId,
+               @Nonnull DirectoryStateHandle directoryStateHandle,
+               @Nonnull KeyGroupRange keyGroupRange,
+               @Nonnull StreamStateHandle metaDataState,
+               @Nonnull Set<StateHandleID> sharedStateHandleIDs) {
+
+               super(directoryStateHandle, keyGroupRange);
+               this.backendIdentifier = backendIdentifier;
+               this.checkpointId = checkpointId;
+               this.metaDataState = metaDataState;
+               this.sharedStateHandleIDs = sharedStateHandleIDs;
+       }
+
+       @Nonnull
+       public StreamStateHandle getMetaDataState() {
+               return metaDataState;
+       }
+
+       public long getCheckpointId() {
+               return checkpointId;
+       }
+
+       @Nonnull
+       public UUID getBackendIdentifier() {
+               return backendIdentifier;
+       }
+
+       @Nonnull
+       public Set<StateHandleID> getSharedStateHandleIDs() {
+               return sharedStateHandleIDs;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+
+               IncrementalLocalKeyedStateHandle that = 
(IncrementalLocalKeyedStateHandle) o;
+
+               return getMetaDataState().equals(that.getMetaDataState());
+       }
+
+       @Override
+       public void discardState() throws Exception {
+
+               Exception collectedEx = null;
+
+               try {
+                       super.discardState();
+               } catch (Exception e) {
+                       collectedEx = e;
+               }
+
+               try {
+                       metaDataState.discardState();
+               } catch (Exception e) {
+                       collectedEx = ExceptionUtils.firstOrSuppressed(e, 
collectedEx);
+               }
+
+               if (collectedEx != null) {
+                       throw collectedEx;
+               }
+       }
+
+       @Override
+       public long getStateSize() {
+               return super.getStateSize() + metaDataState.getStateSize();
+       }
+
+       @Override
+       public int hashCode() {
+               int result = super.hashCode();
+               result = 31 * result + getMetaDataState().hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "IncrementalLocalKeyedStateHandle{" +
+                       "metaDataState=" + metaDataState +
+                       "} " + super.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index cbe40ee..3326a81 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.util.Disposable;
 
 import java.util.stream.Stream;
 
@@ -30,7 +31,7 @@ import java.util.stream.Stream;
  *
  * @param <K> The key by which state is keyed.
  */
-public interface KeyedStateBackend<K> extends InternalKeyContext<K> {
+public interface KeyedStateBackend<K> extends InternalKeyContext<K>, 
Disposable {
 
        /**
         * Sets the current key that is used for partitioned state.
@@ -102,8 +103,6 @@ public interface KeyedStateBackend<K> extends 
InternalKeyContext<K> {
                        TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<S, ?> stateDescriptor) throws Exception;
 
-       /**
-        * Closes the backend and releases all resources.
-        */
+       @Override
        void dispose();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
new file mode 100644
index 0000000..c97fa0b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryConfig.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * This class encapsulates the completed configuration for local recovery, 
i.e. the root
+ * directories into which all file-based snapshots can be written and the 
general mode for the local recover feature.
+ */
+public class LocalRecoveryConfig {
+
+       /**
+        * Enum over modes of local recovery:
+        * <p><ul>
+        * <li>DISABLED: disables local recovery.
+        * <li>ENABLE_FILE_BASED: enables local recovery in a variant that is 
based on local files.
+        * </ul>
+        */
+       public enum LocalRecoveryMode {
+               DISABLED,
+               ENABLE_FILE_BASED;
+
+               /**
+                * Extracts the {@link LocalRecoveryMode} from the given 
configuration. Defaults to LocalRecoveryMode.DISABLED
+                * if no configuration value is specified or parsing the value 
resulted in an exception.
+                *
+                * @param configuration the configuration that specifies the 
value for the local recovery mode.
+                * @return the local recovery mode as found in the config, or 
LocalRecoveryMode.DISABLED if no mode was
+                * configured or the specified mode could not be parsed.
+                */
+               @Nonnull
+               public static LocalRecoveryMode fromConfig(@Nonnull 
Configuration configuration) {
+                       String localRecoveryConfString = 
configuration.getString(CheckpointingOptions.LOCAL_RECOVERY);
+                       try {
+                               return 
LocalRecoveryConfig.LocalRecoveryMode.valueOf(localRecoveryConfString);
+                       } catch (IllegalArgumentException ex) {
+                               
LoggerFactory.getLogger(LocalRecoveryConfig.class).warn(
+                                       "Exception while parsing configuration 
of local recovery mode. Local recovery will be disabled.",
+                                       ex);
+                               return 
LocalRecoveryConfig.LocalRecoveryMode.DISABLED;
+                       }
+               }
+       }
+
+       /** The local recovery mode. */
+       @Nonnull
+       private final LocalRecoveryMode localRecoveryMode;
+
+       /** Encapsulates the root directories and the subtask-specific path. */
+       @Nonnull
+       private final LocalRecoveryDirectoryProvider localStateDirectories;
+
+       public LocalRecoveryConfig(
+               @Nonnull LocalRecoveryMode localRecoveryMode,
+               @Nonnull LocalRecoveryDirectoryProvider directoryProvider) {
+               this.localRecoveryMode = localRecoveryMode;
+               this.localStateDirectories = directoryProvider;
+       }
+
+       @Nonnull
+       public LocalRecoveryMode getLocalRecoveryMode() {
+               return localRecoveryMode;
+       }
+
+       @Nonnull
+       public LocalRecoveryDirectoryProvider getLocalStateDirectoryProvider() {
+               return localStateDirectories;
+       }
+
+       @Override
+       public String toString() {
+               return "LocalRecoveryConfig{" +
+                       "localRecoveryMode=" + localRecoveryMode +
+                       ", localStateDirectories=" + localStateDirectories +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java
new file mode 100644
index 0000000..3f7ab5c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.File;
+import java.io.Serializable;
+
+/**
+ * Provides directories for local recovery. It offers access to the allocation 
base directories (i.e. the root
+ * directories for all local state that is created under the same allocation 
id) and the subtask-specific paths, which
+ * contain the local state for one subtask. Access by checkpoint id rotates 
over all root directory indexes, in case
+ * that there is more than one. Selection methods are provided to pick the 
directory under a certain index. Directory
+ * structures are of the following shape:
+ *
+ * <p><blockquote><pre>
+ * |-----allocationBaseDirectory------|
+ * |-----subtaskBaseDirectory--------------------------------------|
+ * |-----subtaskSpecificCheckpointDirectory------------------------------|
+ *
+ * 
../local_state_root_1/allocation_id/job_id/vertex_id_subtask_idx/chk_1/(state)
+ * 
../local_state_root_2/allocation_id/job_id/vertex_id_subtask_idx/chk_2/(state)
+ *
+ * (...)
+ * </pre></blockquote><p>
+ */
+public interface LocalRecoveryDirectoryProvider extends Serializable {
+
+       /**
+        * Returns the local state allocation base directory for given 
checkpoint id w.r.t. our rotation
+        * over all available allocation base directories.
+        */
+       File allocationBaseDirectory(long checkpointId);
+
+       /**
+        * Returns the local state directory for the owning subtask the given 
checkpoint id w.r.t. our rotation over all
+        * available available allocation base directories. This directory is 
contained in the directory returned by
+        * {@link #allocationBaseDirectory(long)} for the same checkpoint id.
+        */
+       File subtaskBaseDirectory(long checkpointId);
+
+       /**
+        * Returns the local state directory for the specific operator subtask 
and the given checkpoint id w.r.t. our
+        * rotation over all available root dirs. This directory is contained 
in the directory returned by
+        * {@link #subtaskBaseDirectory(long)} for the same checkpoint id.
+        */
+       File subtaskSpecificCheckpointDirectory(long checkpointId);
+
+       /**
+        * Returns a specific allocation base directory. The index must be 
between 0 (incl.) and
+        * {@link #allocationBaseDirsCount()} (excl.).
+        */
+       File selectAllocationBaseDirectory(int idx);
+
+       /**
+        * Returns a specific subtask base directory. The index must be between 
0 (incl.) and
+        * {@link #allocationBaseDirsCount()} (excl.). This directory is direct 
a child of
+        * {@link #selectSubtaskBaseDirectory(int)} given the same index.
+        */
+       File selectSubtaskBaseDirectory(int idx);
+
+       /**
+        * Returns the total number of allocation base directories.
+        */
+       int allocationBaseDirsCount();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java
new file mode 100644
index 0000000..ef50929
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImpl.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Arrays;
+
+/**
+ * Implementation of {@link LocalRecoveryDirectoryProvider}.
+ */
+public class LocalRecoveryDirectoryProviderImpl implements 
LocalRecoveryDirectoryProvider {
+
+       /** Serial version. */
+       private static final long serialVersionUID = 1L;
+
+       /** Logger for this class. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(LocalRecoveryDirectoryProviderImpl.class);
+
+       /** All available root directories that this can potentially deliver. */
+       @Nonnull
+       private final File[] allocationBaseDirs;
+
+       /** JobID of the owning job. */
+       @Nonnull
+       private final JobID jobID;
+
+       /** JobVertexID of the owning task. */
+       @Nonnull
+       private final JobVertexID jobVertexID;
+
+       /** Index of the owning subtask. */
+       @Nonnegative
+       private final int subtaskIndex;
+
+       public LocalRecoveryDirectoryProviderImpl(
+               File allocationBaseDir,
+               @Nonnull JobID jobID,
+               @Nonnull JobVertexID jobVertexID,
+               @Nonnegative int subtaskIndex) {
+               this(new File[]{allocationBaseDir}, jobID, jobVertexID, 
subtaskIndex);
+       }
+
+       public LocalRecoveryDirectoryProviderImpl(
+               @Nonnull File[] allocationBaseDirs,
+               @Nonnull JobID jobID,
+               @Nonnull JobVertexID jobVertexID,
+               @Nonnegative int subtaskIndex) {
+
+               Preconditions.checkArgument(allocationBaseDirs.length > 0);
+               this.allocationBaseDirs = allocationBaseDirs;
+               this.jobID = jobID;
+               this.jobVertexID = jobVertexID;
+               this.subtaskIndex = subtaskIndex;
+
+               for (File allocationBaseDir : allocationBaseDirs) {
+                       Preconditions.checkNotNull(allocationBaseDir);
+                       allocationBaseDir.mkdirs();
+               }
+       }
+
+       @Override
+       public File allocationBaseDirectory(long checkpointId) {
+               return selectAllocationBaseDirectory((((int) checkpointId) & 
Integer.MAX_VALUE) % allocationBaseDirs.length);
+       }
+
+       @Override
+       public File subtaskBaseDirectory(long checkpointId) {
+               return new File(allocationBaseDirectory(checkpointId), 
subtaskDirString());
+       }
+
+       @Override
+       public File subtaskSpecificCheckpointDirectory(long checkpointId) {
+               return new File(subtaskBaseDirectory(checkpointId), 
checkpointDirString(checkpointId));
+       }
+
+       @Override
+       public File selectAllocationBaseDirectory(int idx) {
+               return allocationBaseDirs[idx];
+       }
+
+       @Override
+       public File selectSubtaskBaseDirectory(int idx) {
+               return new File(selectAllocationBaseDirectory(idx), 
subtaskDirString());
+       }
+
+       @Override
+       public int allocationBaseDirsCount() {
+               return allocationBaseDirs.length;
+       }
+
+       @Override
+       public String toString() {
+               return "LocalRecoveryDirectoryProvider{" +
+                       "rootDirectories=" + 
Arrays.toString(allocationBaseDirs) +
+                       ", jobID=" + jobID +
+                       ", jobVertexID=" + jobVertexID +
+                       ", subtaskIndex=" + subtaskIndex +
+                       '}';
+       }
+
+       @VisibleForTesting
+       String subtaskDirString() {
+               return Paths.get("jid_" + jobID, "vtx_" + jobVertexID + "_sti_" 
+ subtaskIndex).toString();
+       }
+
+       @VisibleForTesting
+       String checkpointDirString(long checkpointId) {
+               return "chk_" + checkpointId;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
deleted file mode 100644
index 1960c1c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.core.fs.AbstractMultiFSDataInputStream;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Wrapper class that takes multiple {@link StreamStateHandle} and makes them 
look like a single one. This is done by
- * providing a contiguous view on all the streams of the inner handles through 
a wrapper stream and by summing up all
- * all the meta data.
- */
-public class MultiStreamStateHandle implements StreamStateHandle {
-
-       private static final long serialVersionUID = -4588701089489569707L;
-       private final List<StreamStateHandle> stateHandles;
-       private final long stateSize;
-
-       public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
-               this.stateHandles = Preconditions.checkNotNull(stateHandles);
-               long calculateSize = 0L;
-               for(StreamStateHandle stateHandle : stateHandles) {
-                       calculateSize += stateHandle.getStateSize();
-               }
-               this.stateSize = calculateSize;
-       }
-
-       @Override
-       public FSDataInputStream openInputStream() throws IOException {
-               return new MultiFSDataInputStream(stateHandles);
-       }
-
-       @Override
-       public void discardState() throws Exception {
-               StateUtil.bestEffortDiscardAllStateObjects(stateHandles);
-       }
-
-       @Override
-       public long getStateSize() {
-               return stateSize;
-       }
-
-       @Override
-       public String toString() {
-               return "MultiStreamStateHandle{" +
-                       "stateHandles=" + stateHandles +
-                       ", stateSize=" + stateSize +
-                       '}';
-       }
-
-       static final class MultiFSDataInputStream extends 
AbstractMultiFSDataInputStream {
-
-               private final TreeMap<Long, StreamStateHandle> stateHandleMap;
-
-               public MultiFSDataInputStream(List<StreamStateHandle> 
stateHandles) throws IOException {
-                       this.stateHandleMap = new TreeMap<>();
-                       this.totalPos = 0L;
-                       long calculateSize = 0L;
-                       for (StreamStateHandle stateHandle : stateHandles) {
-                               stateHandleMap.put(calculateSize, stateHandle);
-                               calculateSize += stateHandle.getStateSize();
-                       }
-                       this.totalAvailable = calculateSize;
-
-                       if (totalAvailable > 0L) {
-                               StreamStateHandle first = 
stateHandleMap.firstEntry().getValue();
-                               delegate = first.openInputStream();
-                       }
-               }
-
-               @Override
-               protected FSDataInputStream getSeekedStreamForOffset(long 
globalStreamOffset) throws IOException {
-                       Map.Entry<Long, StreamStateHandle> handleEntry = 
stateHandleMap.floorEntry(globalStreamOffset);
-                       if (handleEntry != null) {
-                               FSDataInputStream stream = 
handleEntry.getValue().openInputStream();
-                               stream.seek(globalStreamOffset - 
handleEntry.getKey());
-                               return stream;
-                       }
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index aee5226..3cbb351 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -19,19 +19,22 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.util.Disposable;
 
 import java.io.Closeable;
+import java.util.Collection;
 
 /**
  * Interface that combines both, the user facing {@link OperatorStateStore} 
interface and the system interface
  * {@link Snapshotable}
  *
  */
-public interface OperatorStateBackend extends OperatorStateStore, 
Snapshotable<OperatorStateHandle>, Closeable {
+public interface OperatorStateBackend extends
+       OperatorStateStore,
+       Snapshotable<SnapshotResult<OperatorStateHandle>, 
Collection<OperatorStateHandle>>,
+       Closeable,
+       Disposable {
 
-       /**
-        * Disposes the backend and releases all resources.
-        */
+       @Override
        void dispose();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
index 036aed0..ba28631 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java
@@ -71,14 +71,14 @@ public final class OperatorStateCheckpointOutputStream
                OperatorStateHandle.StateMetaInfo metaInfo =
                                new OperatorStateHandle.StateMetaInfo(
                                                partitionOffsets.toArray(),
-                                               
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
+                                       
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
 
                
offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, 
metaInfo);
 
-               return new OperatorStateHandle(offsetsMap, streamStateHandle);
+               return new OperatorStreamStateHandle(offsetsMap, 
streamStateHandle);
        }
 
        public int getNumberOfPartitions() {
                return partitionOffsets.size();
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
index f9427ef..1ebdaff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
@@ -27,107 +27,39 @@ import java.util.Arrays;
 import java.util.Map;
 
 /**
- * State handle for partitionable operator state. Besides being a {@link 
StreamStateHandle}, this also provides a
- * map that contains the offsets to the partitions of named states in the 
stream.
+ * Interface of a state handle for operator state.
  */
-public class OperatorStateHandle implements StreamStateHandle {
+public interface OperatorStateHandle extends StreamStateHandle {
 
        /**
-        * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
+        * Returns a map of meta data for all contained states by their name.
         */
-       public enum Mode {
-               SPLIT_DISTRIBUTE,       // The operator state partitions in the 
state handle are split and distributed to one task each.
-               UNION,                          // The operator state 
partitions are UNION-ed upon restoring and sent to all tasks.
-               BROADCAST                       // The operator states are 
identical, as the state is produced from a broadcast stream.
-       }
-
-       private static final long serialVersionUID = 35876522969227335L;
+       Map<String, StateMetaInfo> getStateNameToPartitionOffsets();
 
        /**
-        * unique state name -> offsets for available partitions in the handle 
stream
+        * Returns an input stream to read the operator state information.
         */
-       private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
-       private final StreamStateHandle delegateStateHandle;
-
-       public OperatorStateHandle(
-                       Map<String, StateMetaInfo> stateNameToPartitionOffsets,
-                       StreamStateHandle delegateStateHandle) {
-
-               this.delegateStateHandle = 
Preconditions.checkNotNull(delegateStateHandle);
-               this.stateNameToPartitionOffsets = 
Preconditions.checkNotNull(stateNameToPartitionOffsets);
-       }
-
-       public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
-               return stateNameToPartitionOffsets;
-       }
-
-       @Override
-       public void discardState() throws Exception {
-               delegateStateHandle.discardState();
-       }
-
-       @Override
-       public long getStateSize() {
-               return delegateStateHandle.getStateSize();
-       }
-
        @Override
-       public FSDataInputStream openInputStream() throws IOException {
-               return delegateStateHandle.openInputStream();
-       }
-
-       public StreamStateHandle getDelegateStateHandle() {
-               return delegateStateHandle;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-
-               if (!(o instanceof OperatorStateHandle)) {
-                       return false;
-               }
-
-               OperatorStateHandle that = (OperatorStateHandle) o;
+       FSDataInputStream openInputStream() throws IOException;
 
-               if (stateNameToPartitionOffsets.size() != 
that.stateNameToPartitionOffsets.size()) {
-                       return false;
-               }
-
-               for (Map.Entry<String, StateMetaInfo> entry : 
stateNameToPartitionOffsets.entrySet()) {
-                       if 
(!entry.getValue().equals(that.stateNameToPartitionOffsets.get(entry.getKey())))
 {
-                               return false;
-                       }
-               }
-
-               return delegateStateHandle.equals(that.delegateStateHandle);
-       }
-
-       @Override
-       public int hashCode() {
-               int result = delegateStateHandle.hashCode();
-               for (Map.Entry<String, StateMetaInfo> entry : 
stateNameToPartitionOffsets.entrySet()) {
-
-                       int entryHash = entry.getKey().hashCode();
-                       if (entry.getValue() != null) {
-                               entryHash += entry.getValue().hashCode();
-                       }
-                       result = 31 * result + entryHash;
-               }
-               return result;
-       }
+       /**
+        * Returns the underlying stream state handle that points to the state 
data.
+        */
+       StreamStateHandle getDelegateStateHandle();
 
-       @Override
-       public String toString() {
-               return "OperatorStateHandle{" +
-                               "stateNameToPartitionOffsets=" + 
stateNameToPartitionOffsets +
-                               ", delegateStateHandle=" + delegateStateHandle +
-                               '}';
+       /**
+        * The modes that determine how an {@link OperatorStreamStateHandle} is 
assigned to tasks during restore.
+        */
+       enum Mode {
+               SPLIT_DISTRIBUTE,       // The operator state partitions in the 
state handle are split and distributed to one task each.
+               UNION,                          // The operator state 
partitions are UNION-ed upon restoring and sent to all tasks.
+               BROADCAST                       // The operator states are 
identical, as the state is produced from a broadcast stream.
        }
 
-       public static class StateMetaInfo implements Serializable {
+       /**
+        * Meta information about the operator state handle.
+        */
+       class StateMetaInfo implements Serializable {
 
                private static final long serialVersionUID = 
3593817615858941166L;
 
@@ -158,10 +90,8 @@ public class OperatorStateHandle implements 
StreamStateHandle {
 
                        StateMetaInfo that = (StateMetaInfo) o;
 
-                       if (!Arrays.equals(getOffsets(), that.getOffsets())) {
-                               return false;
-                       }
-                       return getDistributionMode() == 
that.getDistributionMode();
+                       return Arrays.equals(getOffsets(), that.getOffsets())
+                               && getDistributionMode() == 
that.getDistributionMode();
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
new file mode 100644
index 0000000..3900834
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * State handle for partitionable operator state. Besides being a {@link 
StreamStateHandle}, this also provides a
+ * map that contains the offsets to the partitions of named states in the 
stream.
+ */
+public class OperatorStreamStateHandle implements OperatorStateHandle {
+
+       private static final long serialVersionUID = 35876522969227335L;
+
+       /**
+        * unique state name -> offsets for available partitions in the handle 
stream
+        */
+       private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
+       private final StreamStateHandle delegateStateHandle;
+
+       public OperatorStreamStateHandle(
+                       Map<String, StateMetaInfo> stateNameToPartitionOffsets,
+                       StreamStateHandle delegateStateHandle) {
+
+               this.delegateStateHandle = 
Preconditions.checkNotNull(delegateStateHandle);
+               this.stateNameToPartitionOffsets = 
Preconditions.checkNotNull(stateNameToPartitionOffsets);
+       }
+
+       @Override
+       public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
+               return stateNameToPartitionOffsets;
+       }
+
+       @Override
+       public void discardState() throws Exception {
+               delegateStateHandle.discardState();
+       }
+
+       @Override
+       public long getStateSize() {
+               return delegateStateHandle.getStateSize();
+       }
+
+       @Override
+       public FSDataInputStream openInputStream() throws IOException {
+               return delegateStateHandle.openInputStream();
+       }
+
+       @Override
+       public StreamStateHandle getDelegateStateHandle() {
+               return delegateStateHandle;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (!(o instanceof OperatorStreamStateHandle)) {
+                       return false;
+               }
+
+               OperatorStreamStateHandle that = (OperatorStreamStateHandle) o;
+
+               if (stateNameToPartitionOffsets.size() != 
that.stateNameToPartitionOffsets.size()) {
+                       return false;
+               }
+
+               for (Map.Entry<String, StateMetaInfo> entry : 
stateNameToPartitionOffsets.entrySet()) {
+                       if 
(!entry.getValue().equals(that.stateNameToPartitionOffsets.get(entry.getKey())))
 {
+                               return false;
+                       }
+               }
+
+               return delegateStateHandle.equals(that.delegateStateHandle);
+       }
+
+       @Override
+       public int hashCode() {
+               int result = delegateStateHandle.hashCode();
+               for (Map.Entry<String, StateMetaInfo> entry : 
stateNameToPartitionOffsets.entrySet()) {
+
+                       int entryHash = entry.getKey().hashCode();
+                       if (entry.getValue() != null) {
+                               entryHash += entry.getValue().hashCode();
+                       }
+                       result = 31 * result + entryHash;
+               }
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "OperatorStateHandle{" +
+                               "stateNameToPartitionOffsets=" + 
stateNameToPartitionOffsets +
+                               ", delegateStateHandle=" + delegateStateHandle +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
new file mode 100644
index 0000000..3c0054f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class represents a directory that is the target for a state snapshot. 
This class provides some method that
+ * simplify resource management when dealing with such directories, e.g. it 
can produce a {@link DirectoryStateHandle}
+ * when the snapshot is completed and disposal considers whether or not a 
snapshot was already completed. For a
+ * completed snapshot, the ownership for cleanup is transferred to the created 
directory state handle. For incomplete
+ * snapshots, calling {@link #cleanup()} will delete the underlying directory 
resource.
+ */
+public abstract class SnapshotDirectory {
+
+       /**
+        * Lifecycle stages of a snapshot directory.
+        */
+       enum State {
+               ONGOING, COMPLETED, DELETED
+       }
+
+       /** This path describes the underlying directory for the snapshot. */
+       @Nonnull
+       protected final Path directory;
+
+       /** The filesystem that contains the snapshot directory. */
+       @Nonnull
+       protected final FileSystem fileSystem;
+
+       /** This reference tracks the lifecycle state of the snapshot 
directory. */
+       @Nonnull
+       protected AtomicReference<State> state;
+
+       private SnapshotDirectory(@Nonnull Path directory, @Nonnull FileSystem 
fileSystem) {
+               this.directory = directory;
+               this.fileSystem = fileSystem;
+               this.state = new AtomicReference<>(State.ONGOING);
+       }
+
+       private SnapshotDirectory(@Nonnull Path directory) throws IOException {
+               this(directory, directory.getFileSystem());
+       }
+
+       @Nonnull
+       public Path getDirectory() {
+               return directory;
+       }
+
+       public boolean mkdirs() throws IOException {
+               return fileSystem.mkdirs(directory);
+       }
+
+       @Nonnull
+       public FileSystem getFileSystem() {
+               return fileSystem;
+       }
+
+       public boolean exists() throws IOException {
+               return fileSystem.exists(directory);
+       }
+
+       /**
+        * List the statuses of the files/directories in the snapshot directory.
+        *
+        * @return the statuses of the files/directories in the given path.
+        * @throws IOException if there is a problem creating the file statuses.
+        */
+       public FileStatus[] listStatus() throws IOException {
+               return fileSystem.listStatus(directory);
+       }
+
+       /**
+        * Calling this method will attempt delete the underlying snapshot 
directory recursively, if the state is
+        * "ongoing". In this case, the state will be set to "deleted" as a 
result of this call.
+        *
+        * @return <code>true</code> if delete is successful, 
<code>false</code> otherwise.
+        * @throws IOException if an exception happens during the delete.
+        */
+       public boolean cleanup() throws IOException {
+               return !state.compareAndSet(State.ONGOING, State.DELETED) || 
fileSystem.delete(directory, true);
+       }
+
+       /**
+        * Returns <code>true</code> if the snapshot is marked as completed.
+        */
+       public boolean isSnapshotCompleted() {
+               return State.COMPLETED == state.get();
+       }
+
+       /**
+        * Calling this method completes the snapshot for this snapshot 
directory, if possible, and creates a corresponding
+        * {@link DirectoryStateHandle} that points to the snapshot directory. 
Calling this method can change the
+        * lifecycle state from ONGOING to COMPLETED if the directory should no 
longer deleted in {@link #cleanup()}. This
+        * method can return Can return <code>true</code> if the directory is 
temporary and should therefore not be
+        * referenced in a handle.
+        *
+        * @return A directory state handle that points to the snapshot 
directory. Can return <code>true</code> if the
+        * directory is temporary and should therefore not be referenced in a 
handle.
+        * @throws IOException if the state of this snapshot directory object 
is different from "ongoing".
+        */
+       @Nullable
+       public abstract DirectoryStateHandle completeSnapshotAndGetHandle() 
throws IOException;
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               SnapshotDirectory that = (SnapshotDirectory) o;
+
+               return directory.equals(that.directory);
+       }
+
+       @Override
+       public int hashCode() {
+               return directory.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return "SnapshotDirectory{" +
+                       "directory=" + directory +
+                       ", state=" + state +
+                       '}';
+       }
+
+       /**
+        * Creates a temporary snapshot directory for the given path. This will 
always return "null" as result of
+        * {@link #completeSnapshotAndGetHandle()} and always attempt to delete 
the underlying directory in
+        * {@link #cleanup()}.
+        */
+       public static SnapshotDirectory temporary(@Nonnull Path directory) 
throws IOException {
+               return new TemporarySnapshotDirectory(directory);
+       }
+
+       /**
+        * Creates a permanent snapshot directory for the given path, which 
will not delete the underlying directory in
+        * {@link #cleanup()} after {@link #completeSnapshotAndGetHandle()} was 
called.
+        */
+       public static SnapshotDirectory permanent(@Nonnull Path directory) 
throws IOException {
+               return new PermanentSnapshotDirectory(directory);
+       }
+
+       private static class TemporarySnapshotDirectory extends 
SnapshotDirectory {
+
+               TemporarySnapshotDirectory(@Nonnull Path directory) throws 
IOException {
+                       super(directory);
+               }
+
+               @Override
+               public DirectoryStateHandle completeSnapshotAndGetHandle() {
+                       return null; // We return null so that directory it is 
not referenced by a state handle.
+               }
+       }
+
+       private static class PermanentSnapshotDirectory extends 
SnapshotDirectory {
+
+               PermanentSnapshotDirectory(@Nonnull Path directory) throws 
IOException {
+                       super(directory);
+               }
+
+               @Override
+               public DirectoryStateHandle completeSnapshotAndGetHandle() 
throws IOException {
+                       if (State.COMPLETED == state.get() || 
state.compareAndSet(State.ONGOING, State.COMPLETED)) {
+                               return new DirectoryStateHandle(directory);
+                       } else {
+                               throw new IOException("Expected state " + 
State.ONGOING + " but found state " + state.get());
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java
new file mode 100644
index 0000000..e292c73
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotResult.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * This class contains the combined results from the snapshot of a state 
backend:
+ * <ul>
+ *   <li>A state object representing the state that will be reported to the 
Job Manager to acknowledge the checkpoint.</li>
+ *   <li>A state object that represents the state for the {@link 
TaskLocalStateStoreImpl}.</li>
+ * </ul>
+ *
+ * Both state objects are optional and can be null, e.g. if there was no state 
to snapshot in the backend. A local
+ * state object that is not null also requires a state to report to the job 
manager that is not null, because the
+ * Job Manager always owns the ground truth about the checkpointed state.
+ */
+public class SnapshotResult<T extends StateObject> implements StateObject {
+
+       private static final long serialVersionUID = 1L;
+
+       /** An singleton instance to represent an empty snapshot result. */
+       private static final SnapshotResult<?> EMPTY = new 
SnapshotResult<>(null, null);
+
+       /** This is the state snapshot that will be reported to the Job Manager 
to acknowledge a checkpoint. */
+       private final T jobManagerOwnedSnapshot;
+
+       /** This is the state snapshot that will be reported to the Job Manager 
to acknowledge a checkpoint. */
+       private final T taskLocalSnapshot;
+
+       /**
+        * Creates a {@link SnapshotResult} for the given 
jobManagerOwnedSnapshot and taskLocalSnapshot. If the
+        * jobManagerOwnedSnapshot is null, taskLocalSnapshot must also be null.
+        *
+        * @param jobManagerOwnedSnapshot Snapshot for report to job manager. 
Can be null.
+        * @param taskLocalSnapshot Snapshot for report to local state manager. 
This is optional and requires
+        *                             jobManagerOwnedSnapshot to be not null 
if this is not also null.
+        */
+       private SnapshotResult(T jobManagerOwnedSnapshot, T taskLocalSnapshot) {
+
+               if (jobManagerOwnedSnapshot == null && taskLocalSnapshot != 
null) {
+                       throw new IllegalStateException("Cannot report local 
state snapshot without corresponding remote state!");
+               }
+
+               this.jobManagerOwnedSnapshot = jobManagerOwnedSnapshot;
+               this.taskLocalSnapshot = taskLocalSnapshot;
+       }
+
+       public T getJobManagerOwnedSnapshot() {
+               return jobManagerOwnedSnapshot;
+       }
+
+       public T getTaskLocalSnapshot() {
+               return taskLocalSnapshot;
+       }
+
+       @Override
+       public void discardState() throws Exception {
+
+               Exception aggregatedExceptions = null;
+
+               if (jobManagerOwnedSnapshot != null) {
+                       try {
+                               jobManagerOwnedSnapshot.discardState();
+                       } catch (Exception remoteDiscardEx) {
+                               aggregatedExceptions = remoteDiscardEx;
+                       }
+               }
+
+               if (taskLocalSnapshot != null) {
+                       try {
+                               taskLocalSnapshot.discardState();
+                       } catch (Exception localDiscardEx) {
+                               aggregatedExceptions = 
ExceptionUtils.firstOrSuppressed(localDiscardEx, aggregatedExceptions);
+                       }
+               }
+
+               if (aggregatedExceptions != null) {
+                       throw aggregatedExceptions;
+               }
+       }
+
+       @Override
+       public long getStateSize() {
+               return jobManagerOwnedSnapshot != null ? 
jobManagerOwnedSnapshot.getStateSize() : 0L;
+       }
+
+       @SuppressWarnings("unchecked")
+       public static <T extends StateObject> SnapshotResult<T> empty() {
+               return (SnapshotResult<T>) EMPTY;
+       }
+
+       public static <T extends StateObject> SnapshotResult<T> of(@Nullable T 
jobManagerState) {
+               return jobManagerState != null ? new 
SnapshotResult<>(jobManagerState, null) : empty();
+       }
+
+       public static <T extends StateObject> SnapshotResult<T> withLocalState(
+               @Nonnull T jobManagerState,
+               @Nonnull T localState) {
+               return new SnapshotResult<>(jobManagerState, localState);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
new file mode 100644
index 0000000..9139fa7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Interface for different snapshot approaches in state backends. Implementing 
classes should ideally be stateless or at
+ * least threadsafe, i.e. this is a functional interface and is can be called 
in parallel by multiple checkpoints.
+ *
+ * @param <S> type of the returned state object that represents the result of 
the snapshot operation.
+ */
+@FunctionalInterface
+public interface SnapshotStrategy<S extends StateObject> {
+
+       /**
+        * Operation that writes a snapshot into a stream that is provided by 
the given {@link CheckpointStreamFactory} and
+        * returns a @{@link RunnableFuture} that gives a state handle to the 
snapshot. It is up to the implementation if
+        * the operation is performed synchronous or asynchronous. In the later 
case, the returned Runnable must be executed
+        * first before obtaining the handle.
+        *
+        * @param checkpointId      The ID of the checkpoint.
+        * @param timestamp         The timestamp of the checkpoint.
+        * @param streamFactory     The factory that we can use for writing our 
state to streams.
+        * @param checkpointOptions Options for how to perform this checkpoint.
+        * @return A runnable future that will yield a {@link StateObject}.
+        */
+       RunnableFuture<S> performSnapshot(
+               long checkpointId,
+               long timestamp,
+               CheckpointStreamFactory streamFactory,
+               CheckpointOptions checkpointOptions) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index c7e62f0..733339f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -20,15 +20,15 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 
-import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
 
 /**
  * Interface for operators that can perform snapshots of their state.
  *
  * @param <S> Generic type of the state object that is created as handle to 
snapshots.
+ * @param <R> Generic type of the state object that used in restore.
  */
-public interface Snapshotable<S extends StateObject> {
+public interface Snapshotable<S extends StateObject, R> {
 
        /**
         * Operation that writes a snapshot into a stream that is provided by 
the given {@link CheckpointStreamFactory} and
@@ -54,5 +54,5 @@ public interface Snapshotable<S extends StateObject> {
         *
         * @param state the old state to restore.
         */
-       void restore(Collection<S> state) throws Exception;
+       void restore(R state) throws Exception;
 }

Reply via email to