http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index b7e4794..f60cb2c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -33,10 +33,12 @@ import 
org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TernaryBoolean;
 
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
@@ -103,10 +105,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        @Nullable
        private OptionsFactory optionsFactory;
 
-       /** True if incremental checkpointing is enabled.
-        * Null if not yet set, in which case the configuration values will be 
used. */
-       @Nullable
-       private Boolean enableIncrementalCheckpointing;
+       /** This determines if incremental checkpointing is enabled. */
+       private final TernaryBoolean enableIncrementalCheckpointing;
 
        // -- runtime values, set on TaskManager when initializing / using the 
backend
 
@@ -201,7 +201,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
         */
        public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
-               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
+               this(checkpointStreamBackend, TernaryBoolean.UNDEFINED);
        }
 
        /**
@@ -215,7 +215,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
         * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
         */
-       public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
boolean enableIncrementalCheckpointing) {
+       public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
                this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
        }
@@ -225,16 +225,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         */
        @Deprecated
        public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend) {
-               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
+               this(checkpointStreamBackend, TernaryBoolean.UNDEFINED);
        }
 
        /**
-        * @deprecated Use {@link #RocksDBStateBackend(StateBackend, boolean)} 
instead.
+        * @deprecated Use {@link #RocksDBStateBackend(StateBackend, 
TernaryBoolean)} instead.
         */
        @Deprecated
        public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
-               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
-               this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
+               this(checkpointStreamBackend, 
TernaryBoolean.fromBoolean(enableIncrementalCheckpointing));
        }
 
        /**
@@ -251,13 +250,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                                originalStreamBackend;
 
                // configure incremental checkpoints
-               if (original.enableIncrementalCheckpointing != null) {
-                       this.enableIncrementalCheckpointing = 
original.enableIncrementalCheckpointing;
-               }
-               else {
-                       this.enableIncrementalCheckpointing =
-                                       
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
-               }
+               this.enableIncrementalCheckpointing = 
original.enableIncrementalCheckpointing.resolveUndefined(
+                       
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
 
                // configure local directories
                if (original.localRocksDbDirectories != null) {
@@ -407,6 +401,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                File instanceBasePath =
                                new File(getNextStoragePath(), "job-" + jobId + 
"_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
 
+               LocalRecoveryConfig localRecoveryConfig =
+                       env.getTaskStateManager().createLocalRecoveryConfig();
+
                return new RocksDBKeyedStateBackend<>(
                                operatorIdentifier,
                                env.getUserClassLoader(),
@@ -418,7 +415,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                                numberOfKeyGroups,
                                keyGroupRange,
                                env.getExecutionConfig(),
-                               isIncrementalCheckpointsEnabled());
+                               isIncrementalCheckpointsEnabled(),
+                               localRecoveryConfig);
        }
 
        @Override
@@ -511,12 +509,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
         * Gets whether incremental checkpoints are enabled for this state 
backend.
         */
        public boolean isIncrementalCheckpointsEnabled() {
-               if (enableIncrementalCheckpointing != null) {
-                       return enableIncrementalCheckpointing;
-               }
-               else {
-                       return 
CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
-               }
+               return 
enableIncrementalCheckpointing.getOrDefault(CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bae1f81..9958577 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -43,8 +43,10 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOut
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -55,6 +57,7 @@ import 
org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFacto
 import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -75,6 +78,8 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -186,7 +191,8 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
                TestTaskStateManager taskStateManagerTestMock = new 
TestTaskStateManager(
                        jobID,
                        executionAttemptID,
-                       checkpointResponderMock);
+                       checkpointResponderMock,
+                       TestLocalRecoveryConfig.disabled());
 
                StreamMockEnvironment mockEnv = new StreamMockEnvironment(
                        testHarness.jobConfig,
@@ -256,12 +262,16 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
                        int count = 1;
 
                        @Override
-                       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
Exception {
+                       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
IOException {
                                // we skip the first created stream, because it 
is used to checkpoint the timer service, which is
                                // currently not asynchronous.
                                if (count > 0) {
                                        --count;
-                                       return new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize);
+                                       return new 
BlockingCheckpointOutputStream(
+                                               new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize),
+                                               null,
+                                               null,
+                                               Integer.MAX_VALUE);
                                } else {
                                        return 
super.createCheckpointStateOutputStream(scope);
                                }
@@ -373,7 +383,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
                                StringSerializer.INSTANCE,
                                new ValueStateDescriptor<>("foobar", 
String.class));
 
-                       RunnableFuture<KeyedStateHandle> snapshotFuture = 
keyedStateBackend.snapshot(
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotFuture = keyedStateBackend.snapshot(
                                checkpointId, timestamp,
                                new TestCheckpointStreamFactory(() -> 
outputStream),
                                
CheckpointOptions.forCheckpointWithDefaultLocation());
@@ -459,7 +469,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
                }
 
                @Override
-               public CheckpointStateOutputStream get() throws Exception {
+               public CheckpointStateOutputStream get() throws IOException {
                        return 
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                }
        }
@@ -472,6 +482,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
                        this.testException = testException;
                }
 
+               @Nullable
                @Override
                public StreamStateHandle closeAndGetHandle() throws IOException 
{
                        throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 3a39ba0..2dd67f5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -411,6 +412,9 @@ public class RocksDBStateBackendConfigTest {
                TaskManagerRuntimeInfo tmInfo = new 
TestingTaskManagerRuntimeInfo(new Configuration(), tempDirStrings);
                when(env.getTaskManagerInfo()).thenReturn(tmInfo);
 
+               TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
+               when(env.getTaskStateManager()).thenReturn(taskStateManager);
+
                return env;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 54af400..9466bc3 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -33,9 +33,11 @@ import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -231,7 +233,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                                1,
                                new KeyGroupRange(0, 0),
                                new ExecutionConfig(),
-                               enableIncrementalCheckpointing);
+                               enableIncrementalCheckpointing,
+                               TestLocalRecoveryConfig.disabled());
 
                        verify(columnFamilyOptions, Mockito.times(1))
                                
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
@@ -249,7 +252,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                setupRocksKeyedStateBackend();
 
                try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
                                keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
                        RocksDB spyDB = keyedStateBackend.db;
@@ -286,7 +289,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        public void testDismissingSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
                try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
                                keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
                        snapshot.cancel(true);
                        verifyRocksObjectsReleased();
@@ -300,7 +303,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        public void testDismissingSnapshotNotRunnable() throws Exception {
                setupRocksKeyedStateBackend();
                try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
                                keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
                        snapshot.cancel(true);
                        Thread asyncSnapshotThread = new Thread(snapshot);
@@ -323,7 +326,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        public void testCompletingSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
                try {
-                       RunnableFuture<KeyedStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
                                keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
                        Thread asyncSnapshotThread = new Thread(snapshot);
                        asyncSnapshotThread.start();
@@ -332,7 +335,9 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                        runStateUpdates();
                        blocker.trigger(); // allow checkpointing to start 
writing
                        waiter.await(); // wait for snapshot stream writing to 
run
-                       KeyedStateHandle keyedStateHandle = snapshot.get();
+
+                       SnapshotResult<KeyedStateHandle> snapshotResult = 
snapshot.get();
+                       KeyedStateHandle keyedStateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
                        assertNotNull(keyedStateHandle);
                        assertTrue(keyedStateHandle.getStateSize() > 0);
                        assertEquals(2, 
keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
@@ -349,7 +354,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        public void testCancelRunningSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
                try {
-                       RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
                        Thread asyncSnapshotThread = new Thread(snapshot);
                        asyncSnapshotThread.start();
                        waiter.await(); // wait for snapshot to run
@@ -425,7 +431,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                                        backend.setCurrentKey(checkpointId);
                                        state.update("Hello-" + checkpointId);
 
-                                       RunnableFuture<KeyedStateHandle> 
snapshot = backend.snapshot(
+                                       
RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = backend.snapshot(
                                                checkpointId,
                                                checkpointId,
                                                createStreamFactory(),
@@ -433,7 +439,11 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
 
                                        snapshot.run();
 
-                                       IncrementalKeyedStateHandle stateHandle 
= (IncrementalKeyedStateHandle) snapshot.get();
+                                       SnapshotResult<KeyedStateHandle> 
snapshotResult = snapshot.get();
+
+                                       IncrementalKeyedStateHandle stateHandle 
=
+                                               (IncrementalKeyedStateHandle) 
snapshotResult.getJobManagerOwnedSnapshot();
+
                                        Map<StateHandleID, StreamStateHandle> 
sharedState =
                                                new 
HashMap<>(stateHandle.getSharedState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 4f16259..4d3f9f5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -347,10 +347,7 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        @Override
-       public final OperatorSnapshotFutures snapshotState(
-                       long checkpointId,
-                       long timestamp,
-                       CheckpointOptions checkpointOptions,
+       public final OperatorSnapshotFutures snapshotState(long checkpointId, 
long timestamp, CheckpointOptions checkpointOptions,
                        CheckpointStreamFactory factory) throws Exception {
 
                KeyGroupRange keyGroupRange = null != keyedStateBackend ?

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
new file mode 100644
index 0000000..ba27a0a
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Disposable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * This class implements the logic that creates (and potentially restores) a 
state backend. The restore logic
+ * considers multiple, prioritized options of snapshots to restore from, where 
all of the options should recreate
+ * the same state for the backend. When we fail to restore from the snapshot 
with the highest priority (typically
+ * the "fastest" to restore), we fallback to the next snapshot with the next 
highest priority. We also take care
+ * of cleaning up from failed restore attempts. We only reattempt when the 
problem occurs during the restore call
+ * and will only stop after all snapshot alternatives are exhausted and all 
failed.
+ *
+ * @param <T> type of the restored backend.
+ * @param <S> type of the supplied snapshots from which the backend restores.
+ */
+public class BackendRestorerProcedure<
+       T extends Closeable & Disposable & Snapshotable<?, Collection<S>>,
+       S extends StateObject> {
+
+       /** Logger for this class. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(BackendRestorerProcedure.class);
+
+       /** Factory for new, fresh backends without state. */
+       private final SupplierWithException<T, Exception> instanceSupplier;
+
+       /** This registry is used so that recovery can participate in the task 
lifecycle, i.e. can be canceled. */
+       private final CloseableRegistry backendCloseableRegistry;
+
+       /**
+        * Creates a new backend restorer using the given backend supplier and 
the closeable registry.
+        *
+        * @param instanceSupplier factory function for new, empty backend 
instances.
+        * @param backendCloseableRegistry registry to allow participation in 
task lifecycle, e.g. react to cancel.
+        */
+       public BackendRestorerProcedure(
+               @Nonnull SupplierWithException<T, Exception> instanceSupplier,
+               @Nonnull CloseableRegistry backendCloseableRegistry) {
+
+               this.instanceSupplier = 
Preconditions.checkNotNull(instanceSupplier);
+               this.backendCloseableRegistry = 
Preconditions.checkNotNull(backendCloseableRegistry);
+       }
+
+       /**
+        * Creates a new state backend and restores it from the provided set of 
state snapshot alternatives.
+        *
+        * @param restoreOptions iterator over a prioritized set of state 
snapshot alternatives for recovery.
+        * @return the created (and restored) state backend.
+        * @throws Exception if the backend could not be created or restored.
+        */
+       public @Nonnull
+       T createAndRestore(@Nonnull Iterator<? extends Collection<S>> 
restoreOptions) throws Exception {
+
+               // This ensures that we always call the restore method even if 
there is no previous state
+               // (required by some backends).
+               Collection<S> attemptState = restoreOptions.hasNext() ?
+                       restoreOptions.next() :
+                       Collections.emptyList();
+
+               while (true) {
+                       try {
+                               return attemptCreateAndRestore(attemptState);
+                       } catch (Exception ex) {
+                               // more attempts?
+                               if (restoreOptions.hasNext()) {
+
+                                       attemptState = restoreOptions.next();
+                                       LOG.warn("Exception while restoring 
backend, will retry with another snapshot replica.", ex);
+                               } else {
+
+                                       throw new FlinkException("Could not 
restore from any of the provided restore options.", ex);
+                               }
+                       }
+               }
+       }
+
+       private T attemptCreateAndRestore(Collection<S> restoreState) throws 
Exception {
+
+               // create a new, empty backend.
+               final T backendInstance = instanceSupplier.get();
+
+               try {
+                       // register the backend with the registry to 
participate in task lifecycle w.r.t. cancellation.
+                       
backendCloseableRegistry.registerCloseable(backendInstance);
+
+                       // attempt to restore from snapshot (or null if no 
state was checkpointed).
+                       backendInstance.restore(restoreState);
+
+                       return backendInstance;
+               } catch (Exception ex) {
+
+                       // under failure, we need do close...
+                       if 
(backendCloseableRegistry.unregisterCloseable(backendInstance)) {
+                               try {
+                                       backendInstance.close();
+                               } catch (IOException closeEx) {
+                                       ex = 
ExceptionUtils.firstOrSuppressed(closeEx, ex);
+                               }
+                       }
+
+                       // ... and dispose, e.g. to release native resources.
+                       try {
+                               backendInstance.dispose();
+                       } catch (Exception disposeEx) {
+                               ex = 
ExceptionUtils.firstOrSuppressed(disposeEx, ex);
+                       }
+
+                       throw ex;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
new file mode 100644
index 0000000..7a93990
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.util.FutureUtil;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * This class finalizes {@link OperatorSnapshotFutures}. Each object is 
created with a {@link OperatorSnapshotFutures}
+ * that is executed. The object can then deliver the results from the 
execution as {@link OperatorSubtaskState}.
+ */
+public class OperatorSnapshotFinalizer {
+
+       /** Primary replica of the operator subtask state for report to JM. */
+       private final OperatorSubtaskState jobManagerOwnedState;
+
+       /** Secondary replica of the operator subtask state for faster, local 
recovery on TM. */
+       private final OperatorSubtaskState taskLocalState;
+
+       public OperatorSnapshotFinalizer(
+               @Nonnull OperatorSnapshotFutures snapshotFutures) throws 
ExecutionException, InterruptedException {
+
+               SnapshotResult<KeyedStateHandle> keyedManaged =
+                       
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
+
+               SnapshotResult<KeyedStateHandle> keyedRaw =
+                       
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
+
+               SnapshotResult<OperatorStateHandle> operatorManaged =
+                       
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
+
+               SnapshotResult<OperatorStateHandle> operatorRaw =
+                       
FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
+
+               jobManagerOwnedState = new OperatorSubtaskState(
+                       operatorManaged.getJobManagerOwnedSnapshot(),
+                       operatorRaw.getJobManagerOwnedSnapshot(),
+                       keyedManaged.getJobManagerOwnedSnapshot(),
+                       keyedRaw.getJobManagerOwnedSnapshot()
+               );
+
+               taskLocalState = new OperatorSubtaskState(
+                       operatorManaged.getTaskLocalSnapshot(),
+                       operatorRaw.getTaskLocalSnapshot(),
+                       keyedManaged.getTaskLocalSnapshot(),
+                       keyedRaw.getTaskLocalSnapshot()
+               );
+       }
+
+       public OperatorSubtaskState getTaskLocalState() {
+               return taskLocalState;
+       }
+
+       public OperatorSubtaskState getJobManagerOwnedState() {
+               return jobManagerOwnedState;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
index bdaf64b..95eb213 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFutures.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.ExceptionUtils;
 
+import javax.annotation.Nonnull;
+
 import java.util.concurrent.RunnableFuture;
 
 /**
@@ -30,55 +34,74 @@ import java.util.concurrent.RunnableFuture;
  */
 public class OperatorSnapshotFutures {
 
-       private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
-       private RunnableFuture<KeyedStateHandle> keyedStateRawFuture;
-       private RunnableFuture<OperatorStateHandle> operatorStateManagedFuture;
-       private RunnableFuture<OperatorStateHandle> operatorStateRawFuture;
+       @Nonnull
+       private RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateManagedFuture;
+
+       @Nonnull
+       private RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateRawFuture;
+
+       @Nonnull
+       private RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateManagedFuture;
+
+       @Nonnull
+       private RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateRawFuture;
 
        public OperatorSnapshotFutures() {
-               this(null, null, null, null);
+               this(
+                       DoneFuture.of(SnapshotResult.empty()),
+                       DoneFuture.of(SnapshotResult.empty()),
+                       DoneFuture.of(SnapshotResult.empty()),
+                       DoneFuture.of(SnapshotResult.empty()));
        }
 
        public OperatorSnapshotFutures(
-                       RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture,
-                       RunnableFuture<KeyedStateHandle> keyedStateRawFuture,
-                       RunnableFuture<OperatorStateHandle> 
operatorStateManagedFuture,
-                       RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
+               @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateManagedFuture,
+               @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateRawFuture,
+               @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateManagedFuture,
+               @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateRawFuture) {
                this.keyedStateManagedFuture = keyedStateManagedFuture;
                this.keyedStateRawFuture = keyedStateRawFuture;
                this.operatorStateManagedFuture = operatorStateManagedFuture;
                this.operatorStateRawFuture = operatorStateRawFuture;
        }
 
-       public RunnableFuture<KeyedStateHandle> getKeyedStateManagedFuture() {
+       @Nonnull
+       public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
getKeyedStateManagedFuture() {
                return keyedStateManagedFuture;
        }
 
-       public void setKeyedStateManagedFuture(RunnableFuture<KeyedStateHandle> 
keyedStateManagedFuture) {
+       public void setKeyedStateManagedFuture(
+               @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateManagedFuture) {
                this.keyedStateManagedFuture = keyedStateManagedFuture;
        }
 
-       public RunnableFuture<KeyedStateHandle> getKeyedStateRawFuture() {
+       @Nonnull
+       public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
getKeyedStateRawFuture() {
                return keyedStateRawFuture;
        }
 
-       public void setKeyedStateRawFuture(RunnableFuture<KeyedStateHandle> 
keyedStateRawFuture) {
+       public void setKeyedStateRawFuture(
+               @Nonnull RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateRawFuture) {
                this.keyedStateRawFuture = keyedStateRawFuture;
        }
 
-       public RunnableFuture<OperatorStateHandle> 
getOperatorStateManagedFuture() {
+       @Nonnull
+       public RunnableFuture<SnapshotResult<OperatorStateHandle>> 
getOperatorStateManagedFuture() {
                return operatorStateManagedFuture;
        }
 
-       public void 
setOperatorStateManagedFuture(RunnableFuture<OperatorStateHandle> 
operatorStateManagedFuture) {
+       public void setOperatorStateManagedFuture(
+               @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateManagedFuture) {
                this.operatorStateManagedFuture = operatorStateManagedFuture;
        }
 
-       public RunnableFuture<OperatorStateHandle> getOperatorStateRawFuture() {
+       @Nonnull
+       public RunnableFuture<SnapshotResult<OperatorStateHandle>> 
getOperatorStateRawFuture() {
                return operatorStateRawFuture;
        }
 
-       public void 
setOperatorStateRawFuture(RunnableFuture<OperatorStateHandle> 
operatorStateRawFuture) {
+       public void setOperatorStateRawFuture(
+               @Nonnull RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateRawFuture) {
                this.operatorStateRawFuture = operatorStateRawFuture;
        }
 
@@ -119,16 +142,4 @@ public class OperatorSnapshotFutures {
                        throw exception;
                }
        }
-
-       public boolean hasKeyedState() {
-               return keyedStateManagedFuture != null || keyedStateRawFuture 
!= null;
-       }
-
-       public boolean hasOperatorState() {
-               return operatorStateManagedFuture != null || 
operatorStateRawFuture != null;
-       }
-
-       public boolean hasState() {
-               return hasKeyedState() || hasOperatorState();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index c3254f6..71f508b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Disposable;
 
 import java.io.Serializable;
 
@@ -45,7 +46,7 @@ import java.io.Serializable;
  * @param <OUT> The output type of the operator
  */
 @PublicEvolving
-public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, 
Serializable {
+public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, 
Disposable, Serializable {
 
        // 
------------------------------------------------------------------------
        //  life cycle
@@ -85,6 +86,7 @@ public interface StreamOperator<OUT> extends 
CheckpointListener, KeyContext, Ser
         * <p>This method is expected to make a thorough effort to release all 
resources
         * that the operator has acquired.
         */
+       @Override
        void dispose() throws Exception;
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
index 75ead44..420b6bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
@@ -40,7 +40,6 @@ public interface StreamOperatorStateContext {
         */
        OperatorStateBackend operatorStateBackend();
 
-
        /**
         * Returns the keyed state backend for the stream operator. This method 
returns null for non-keyed operators.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 9887e45..11e2dda 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -116,10 +117,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
                final String operatorIdentifierText = 
operatorSubtaskDescription.toString();
 
-               final OperatorSubtaskState operatorSubtaskStateFromJobManager =
-                       taskStateManager.operatorStates(operatorID);
-
-               final boolean restoring = (operatorSubtaskStateFromJobManager 
!= null);
+               final PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates =
+                       taskStateManager.prioritizedOperatorState(operatorID);
 
                AbstractKeyedStateBackend<?> keyedStatedBackend = null;
                OperatorStateBackend operatorStateBackend = null;
@@ -133,20 +132,22 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                        keyedStatedBackend = keyedStatedBackend(
                                keySerializer,
                                operatorIdentifierText,
-                               operatorSubtaskStateFromJobManager,
+                               prioritizedOperatorSubtaskStates,
                                streamTaskCloseableRegistry);
 
                        // -------------- Operator State Backend --------------
                        operatorStateBackend = operatorStateBackend(
                                operatorIdentifierText,
-                               operatorSubtaskStateFromJobManager,
+                               prioritizedOperatorSubtaskStates,
                                streamTaskCloseableRegistry);
 
                        // -------------- Raw State Streams --------------
-                       rawKeyedStateInputs = 
rawKeyedStateInputs(operatorSubtaskStateFromJobManager);
+                       rawKeyedStateInputs = rawKeyedStateInputs(
+                               
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState());
                        
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
 
-                       rawOperatorStateInputs = 
rawOperatorStateInputs(operatorSubtaskStateFromJobManager);
+                       rawOperatorStateInputs = rawOperatorStateInputs(
+                               
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState());
                        
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
 
                        // -------------- Internal Timer Service Manager 
--------------
@@ -155,7 +156,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                        // -------------- Preparing return value --------------
 
                        return new StreamOperatorStateContextImpl(
-                               restoring,
+                               prioritizedOperatorSubtaskStates.isRestored(),
                                operatorStateBackend,
                                keyedStatedBackend,
                                timeServiceManager,
@@ -222,155 +223,114 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
 
        protected OperatorStateBackend operatorStateBackend(
                String operatorIdentifierText,
-               OperatorSubtaskState operatorSubtaskStateFromJobManager,
+               PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
                CloseableRegistry backendCloseableRegistry) throws Exception {
 
-               //TODO search in local state for a local recovery opportunity.
+               BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> backendRestorer =
+                       new BackendRestorerProcedure<>(
+                               () -> 
stateBackend.createOperatorStateBackend(environment, operatorIdentifierText),
+                               backendCloseableRegistry);
 
-               return createOperatorStateBackendFromJobManagerState(
-                       operatorIdentifierText,
-                       operatorSubtaskStateFromJobManager,
-                       backendCloseableRegistry);
+               return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
        }
 
        protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(
                TypeSerializer<K> keySerializer,
                String operatorIdentifierText,
-               OperatorSubtaskState operatorSubtaskStateFromJobManager,
+               PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
                CloseableRegistry backendCloseableRegistry) throws Exception {
 
                if (keySerializer == null) {
                        return null;
                }
 
-               //TODO search in local state for a local recovery opportunity.
+               TaskInfo taskInfo = environment.getTaskInfo();
+
+               final KeyGroupRange keyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+                       taskInfo.getMaxNumberOfParallelSubtasks(),
+                       taskInfo.getNumberOfParallelSubtasks(),
+                       taskInfo.getIndexOfThisSubtask());
 
-               return createKeyedStatedBackendFromJobManagerState(
-                       keySerializer,
-                       operatorIdentifierText,
-                       operatorSubtaskStateFromJobManager,
-                       backendCloseableRegistry);
+               BackendRestorerProcedure<AbstractKeyedStateBackend<K>, 
KeyedStateHandle> backendRestorer =
+                       new BackendRestorerProcedure<>(
+                               () -> stateBackend.createKeyedStateBackend(
+                                       environment,
+                                       environment.getJobID(),
+                                       operatorIdentifierText,
+                                       keySerializer,
+                                       
taskInfo.getMaxNumberOfParallelSubtasks(),
+                                       keyGroupRange,
+                                       environment.getTaskKvStateRegistry()),
+                                       backendCloseableRegistry);
+
+               return 
backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
        }
 
        protected CloseableIterable<StatePartitionStreamProvider> 
rawOperatorStateInputs(
-               OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+               Iterator<StateObjectCollection<OperatorStateHandle>> 
restoreStateAlternatives) {
 
-               if (operatorSubtaskStateFromJobManager != null) {
+               if (restoreStateAlternatives.hasNext()) {
 
                        final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
-                       Collection<OperatorStateHandle> rawOperatorState =
-                               
operatorSubtaskStateFromJobManager.getRawOperatorState();
-
-                       return new 
CloseableIterable<StatePartitionStreamProvider>() {
-                               @Override
-                               public void close() throws IOException {
-                                       closeableRegistry.close();
-                               }
-
-                               @Nonnull
-                               @Override
-                               public Iterator<StatePartitionStreamProvider> 
iterator() {
-                                       return new OperatorStateStreamIterator(
-                                               
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
-                                               rawOperatorState.iterator(), 
closeableRegistry);
-                               }
-                       };
-               }
-
-               return CloseableIterable.empty();
-       }
-
-       protected CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs(
-               OperatorSubtaskState operatorSubtaskStateFromJobManager) {
+                       Collection<OperatorStateHandle> rawOperatorState = 
restoreStateAlternatives.next();
+                       // TODO currently this does not support local state 
recovery, so we expect there is only one handle.
+                       Preconditions.checkState(
+                               !restoreStateAlternatives.hasNext(),
+                               "Local recovery is currently not implemented 
for raw operator state, but found state alternative.");
 
-               if (operatorSubtaskStateFromJobManager != null) {
+                       if (rawOperatorState != null) {
 
-                       Collection<KeyedStateHandle> rawKeyedState = 
operatorSubtaskStateFromJobManager.getRawKeyedState();
-                       Collection<KeyGroupsStateHandle> keyGroupsStateHandles 
= transform(rawKeyedState);
-                       final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
-
-                       return new 
CloseableIterable<KeyGroupStatePartitionStreamProvider>() {
-                               @Override
-                               public void close() throws IOException {
-                                       closeableRegistry.close();
-                               }
+                               return new 
CloseableIterable<StatePartitionStreamProvider>() {
+                                       @Override
+                                       public void close() throws IOException {
+                                               closeableRegistry.close();
+                                       }
 
-                               @Override
-                               public 
Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
-                                       return new 
KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
-                               }
-                       };
+                                       @Nonnull
+                                       @Override
+                                       public 
Iterator<StatePartitionStreamProvider> iterator() {
+                                               return new 
OperatorStateStreamIterator(
+                                                       
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
+                                                       
rawOperatorState.iterator(), closeableRegistry);
+                                       }
+                               };
+                       }
                }
 
                return CloseableIterable.empty();
        }
 
-       // 
=================================================================================================================
-
-       private OperatorStateBackend 
createOperatorStateBackendFromJobManagerState(
-               String operatorIdentifierText,
-               OperatorSubtaskState operatorSubtaskStateFromJobManager,
-               CloseableRegistry backendCloseableRegistry) throws Exception {
-
-               final OperatorStateBackend operatorStateBackend =
-                       stateBackend.createOperatorStateBackend(environment, 
operatorIdentifierText);
-
-               
backendCloseableRegistry.registerCloseable(operatorStateBackend);
-
-               Collection<OperatorStateHandle> managedOperatorState = null;
-
-               if (operatorSubtaskStateFromJobManager != null) {
-                       managedOperatorState = 
operatorSubtaskStateFromJobManager.getManagedOperatorState();
-               }
-
-               operatorStateBackend.restore(managedOperatorState);
-
-               return operatorStateBackend;
-       }
+       protected CloseableIterable<KeyGroupStatePartitionStreamProvider> 
rawKeyedStateInputs(
+               Iterator<StateObjectCollection<KeyedStateHandle>> 
restoreStateAlternatives) {
 
-       private <K> AbstractKeyedStateBackend<K> 
createKeyedStatedBackendFromJobManagerState(
-               TypeSerializer<K> keySerializer,
-               String operatorIdentifierText,
-               OperatorSubtaskState operatorSubtaskStateFromJobManager,
-               CloseableRegistry backendCloseableRegistry) throws Exception {
+               if (restoreStateAlternatives.hasNext()) {
+                       Collection<KeyedStateHandle> rawKeyedState = 
restoreStateAlternatives.next();
 
-               final AbstractKeyedStateBackend<K> keyedStateBackend = 
createKeyedStateBackend(
-                       operatorIdentifierText,
-                       keySerializer);
+                       // TODO currently this does not support local state 
recovery, so we expect there is only one handle.
+                       Preconditions.checkState(
+                               !restoreStateAlternatives.hasNext(),
+                               "Local recovery is currently not implemented 
for raw keyed state, but found state alternative.");
 
-               backendCloseableRegistry.registerCloseable(keyedStateBackend);
+                       if (rawKeyedState != null) {
+                               Collection<KeyGroupsStateHandle> 
keyGroupsStateHandles = transform(rawKeyedState);
+                               final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
-               Collection<KeyedStateHandle> managedKeyedState = null;
+                               return new 
CloseableIterable<KeyGroupStatePartitionStreamProvider>() {
+                                       @Override
+                                       public void close() throws IOException {
+                                               closeableRegistry.close();
+                                       }
 
-               if (operatorSubtaskStateFromJobManager != null) {
-                       managedKeyedState = 
operatorSubtaskStateFromJobManager.getManagedKeyedState();
+                                       @Override
+                                       public 
Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
+                                               return new 
KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
+                                       }
+                               };
+                       }
                }
 
-               keyedStateBackend.restore(managedKeyedState);
-
-               return keyedStateBackend;
-       }
-
-       private <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-               String operatorIdentifier,
-               TypeSerializer<K> keySerializer) throws Exception {
-
-               TaskInfo taskInfo = environment.getTaskInfo();
-
-               final KeyGroupRange keyGroupRange = 
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
-                       taskInfo.getMaxNumberOfParallelSubtasks(),
-                       taskInfo.getNumberOfParallelSubtasks(),
-                       taskInfo.getIndexOfThisSubtask());
-
-               return stateBackend.createKeyedStateBackend(
-                       environment,
-                       environment.getJobID(),
-                       operatorIdentifier,
-                       keySerializer,
-                       taskInfo.getMaxNumberOfParallelSubtasks(), //TODO 
check: this is numberOfKeyGroups !!!!
-                       keyGroupRange,
-                       environment.getTaskKvStateRegistry());
+               return CloseableIterable.empty();
        }
 
        // 
=================================================================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 06cb18b..dba4c87 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -27,7 +27,6 @@ import org.apache.flink.core.fs.FileSystemSafetyNet;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -46,6 +45,7 @@ import 
org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -58,7 +58,6 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -800,7 +799,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        // 
------------------------------------------------------------------------
 
-       private static final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
+       /**
+        * This runnable executes the asynchronous parts of all involved 
backend snapshots for the subtask.
+        */
+       @VisibleForTesting
+       protected static final class AsyncCheckpointRunnable implements 
Runnable, Closeable {
 
                private final StreamTask<?, ?> owner;
 
@@ -831,11 +834,12 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                @Override
                public void run() {
                        FileSystemSafetyNet.initializeSafetyNetForThread();
-                       final long checkpointId = 
checkpointMetaData.getCheckpointId();
                        try {
 
-                               boolean hasState = false;
-                               final TaskStateSnapshot 
taskOperatorSubtaskStates =
+                               TaskStateSnapshot 
jobManagerTaskOperatorSubtaskStates =
+                                       new 
TaskStateSnapshot(operatorSnapshotsInProgress.size());
+
+                               TaskStateSnapshot 
localTaskOperatorSubtaskStates =
                                        new 
TaskStateSnapshot(operatorSnapshotsInProgress.size());
 
                                for (Map.Entry<OperatorID, 
OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
@@ -843,15 +847,17 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        OperatorID operatorID = entry.getKey();
                                        OperatorSnapshotFutures 
snapshotInProgress = entry.getValue();
 
-                                       OperatorSubtaskState 
operatorSubtaskState = new OperatorSubtaskState(
-                                               
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
-                                               
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()),
-                                               
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()),
-                                               
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture())
-                                       );
+                                       // finalize the async part of all by 
executing all snapshot runnables
+                                       OperatorSnapshotFinalizer 
finalizedSnapshots =
+                                               new 
OperatorSnapshotFinalizer(snapshotInProgress);
 
-                                       hasState |= 
operatorSubtaskState.hasState();
-                                       
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, 
operatorSubtaskState);
+                                       
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
+                                               operatorID,
+                                               
finalizedSnapshots.getJobManagerOwnedState());
+
+                                       
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
+                                               operatorID,
+                                               
finalizedSnapshots.getTaskLocalState());
                                }
 
                                final long asyncEndNanos = System.nanoTime();
@@ -862,23 +868,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                                        
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
 
-                                       TaskStateSnapshot acknowledgedState = 
hasState ? taskOperatorSubtaskStates : null;
-
-                                       TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
-
-                                       // we signal stateless tasks by 
reporting null, so that there are no attempts to assign empty state
-                                       // to stateless tasks on restore. This 
enables simple job modifications that only concern
-                                       // stateless without the need to assign 
them uids to match their (always empty) states.
-                                       taskStateManager.reportStateHandles(
-                                               checkpointMetaData,
-                                               checkpointMetrics,
-                                               acknowledgedState);
-
-                                       LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms",
-                                               owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
-
-                                       LOG.trace("{} - reported the following 
states in snapshot for checkpoint {}: {}.",
-                                               owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedState);
+                                       reportCompletedSnapshotStates(
+                                               
jobManagerTaskOperatorSubtaskStates,
+                                               localTaskOperatorSubtaskStates,
+                                               asyncDurationMillis);
 
                                } else {
                                        LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed because it was closed before.",
@@ -886,32 +879,66 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                                
checkpointMetaData.getCheckpointId());
                                }
                        } catch (Exception e) {
-                               // the state is completed if an exception 
occurred in the acknowledgeCheckpoint call
-                               // in order to clean up, we have to set it to 
RUNNING again.
-                               asyncCheckpointState.compareAndSet(
-                                       
CheckpointingOperation.AsynCheckpointState.COMPLETED,
-                                       
CheckpointingOperation.AsynCheckpointState.RUNNING);
-
-                               try {
-                                       cleanup();
-                               } catch (Exception cleanupException) {
-                                       e.addSuppressed(cleanupException);
-                               }
-
-                               Exception checkpointException = new Exception(
-                                       "Could not materialize checkpoint " + 
checkpointId + " for operator " +
-                                               owner.getName() + '.',
-                                       e);
-
-                               
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-                                       checkpointMetaData,
-                                       checkpointException);
+                               handleExecutionException(e);
                        } finally {
                                owner.cancelables.unregisterCloseable(this);
                                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                        }
                }
 
+               private void reportCompletedSnapshotStates(
+                       TaskStateSnapshot acknowledgedTaskStateSnapshot,
+                       TaskStateSnapshot localTaskStateSnapshot,
+                       long asyncDurationMillis) {
+
+                       TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
+
+                       boolean hasAckState = 
acknowledgedTaskStateSnapshot.hasState();
+                       boolean hasLocalState = 
localTaskStateSnapshot.hasState();
+
+                       Preconditions.checkState(hasAckState || !hasLocalState,
+                               "Found cached state but no corresponding 
primary state is reported to the job " +
+                                       "manager. This indicates a problem.");
+
+                       // we signal stateless tasks by reporting null, so that 
there are no attempts to assign empty state
+                       // to stateless tasks on restore. This enables simple 
job modifications that only concern
+                       // stateless without the need to assign them uids to 
match their (always empty) states.
+                       taskStateManager.reportTaskStateSnapshots(
+                               checkpointMetaData,
+                               checkpointMetrics,
+                               hasAckState ? acknowledgedTaskStateSnapshot : 
null,
+                               hasLocalState ? localTaskStateSnapshot : null);
+
+                       LOG.debug("{} - finished asynchronous part of 
checkpoint {}. Asynchronous duration: {} ms",
+                               owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
+
+                       LOG.trace("{} - reported the following states in 
snapshot for checkpoint {}: {}.",
+                               owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
+               }
+
+               private void handleExecutionException(Exception e) {
+                       // the state is completed if an exception occurred in 
the acknowledgeCheckpoint call
+                       // in order to clean up, we have to set it to RUNNING 
again.
+                       asyncCheckpointState.compareAndSet(
+                               
CheckpointingOperation.AsynCheckpointState.COMPLETED,
+                               
CheckpointingOperation.AsynCheckpointState.RUNNING);
+
+                       try {
+                               cleanup();
+                       } catch (Exception cleanupException) {
+                               e.addSuppressed(cleanupException);
+                       }
+
+                       Exception checkpointException = new Exception(
+                               "Could not materialize checkpoint " + 
checkpointMetaData.getCheckpointId() + " for operator " +
+                                       owner.getName() + '.',
+                               e);
+
+                       
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
+                               checkpointMetaData,
+                               checkpointException);
+               }
+
                @Override
                public void close() {
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 85069b5..904ff64 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -569,8 +570,8 @@ public class AbstractStreamOperatorTest {
 
                final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
-               RunnableFuture<KeyedStateHandle> futureKeyedStateHandle = 
mock(RunnableFuture.class);
-               RunnableFuture<OperatorStateHandle> futureOperatorStateHandle = 
mock(RunnableFuture.class);
+               RunnableFuture<SnapshotResult<KeyedStateHandle>> 
futureKeyedStateHandle = mock(RunnableFuture.class);
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
futureOperatorStateHandle = mock(RunnableFuture.class);
 
                StateSnapshotContextSynchronousImpl context = 
mock(StateSnapshotContextSynchronousImpl.class);
                
when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
@@ -593,7 +594,7 @@ public class AbstractStreamOperatorTest {
 
                doReturn(containingTask).when(operator).getContainingTask();
 
-               RunnableFuture<OperatorStateHandle> 
futureManagedOperatorStateHandle = mock(RunnableFuture.class);
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
futureManagedOperatorStateHandle = mock(RunnableFuture.class);
 
                OperatorStateBackend operatorStateBackend = 
mock(OperatorStateBackend.class);
                when(operatorStateBackend.snapshot(

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
new file mode 100644
index 0000000..2126f70
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedureTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockingFSDataInputStream;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.verifyZeroInteractions;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for {@link BackendRestorerProcedure}.
+ */
+public class BackendRestorerProcedureTest extends TestLogger {
+
+       private final SupplierWithException<OperatorStateBackend, Exception> 
backendSupplier =
+               () -> new DefaultOperatorStateBackend(
+                       getClass().getClassLoader(),
+                       new ExecutionConfig(),
+                       true);
+
+       /**
+        * Tests that the restore procedure follows the order of the iterator 
and will retries failed attempts if there are
+        * more options.
+        */
+       @Test
+       public void testRestoreProcedureOrderAndFailure() throws Exception {
+
+               CloseableRegistry closeableRegistry = new CloseableRegistry();
+               CheckpointStreamFactory checkpointStreamFactory = new 
MemCheckpointStreamFactory(1024);
+
+               ListStateDescriptor<Integer> stateDescriptor = new 
ListStateDescriptor<>("test-state", Integer.class);
+               OperatorStateBackend originalBackend = backendSupplier.get();
+               SnapshotResult<OperatorStateHandle> snapshotResult;
+
+               try {
+                       ListState<Integer> listState = 
originalBackend.getListState(stateDescriptor);
+
+                       listState.add(0);
+                       listState.add(1);
+                       listState.add(2);
+                       listState.add(3);
+
+                       RunnableFuture<SnapshotResult<OperatorStateHandle>> 
snapshot =
+                               originalBackend.snapshot(0L, 0L, 
checkpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+                       snapshot.run();
+                       snapshotResult = snapshot.get();
+
+               } finally {
+                       originalBackend.close();
+                       originalBackend.dispose();
+               }
+
+               OperatorStateHandle firstFailHandle = 
mock(OperatorStateHandle.class);
+               OperatorStateHandle secondSuccessHandle = 
spy(snapshotResult.getJobManagerOwnedSnapshot());
+               OperatorStateHandle thirdNotUsedHandle = 
mock(OperatorStateHandle.class);
+
+               List<StateObjectCollection<OperatorStateHandle>> 
sortedRestoreOptions = Arrays.asList(
+                       new 
StateObjectCollection<>(Collections.singletonList(firstFailHandle)),
+                       new 
StateObjectCollection<>(Collections.singletonList(secondSuccessHandle)),
+                       new 
StateObjectCollection<>(Collections.singletonList(thirdNotUsedHandle)));
+               Iterator<StateObjectCollection<OperatorStateHandle>> iterator = 
sortedRestoreOptions.iterator();
+
+               BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> restorerProcedure =
+                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry);
+
+               OperatorStateBackend restoredBackend = 
restorerProcedure.createAndRestore(iterator);
+               Assert.assertNotNull(restoredBackend);
+
+               try {
+                       Assert.assertTrue(iterator.hasNext());
+                       Assert.assertTrue(thirdNotUsedHandle == 
iterator.next().iterator().next());
+                       verify(firstFailHandle).openInputStream();
+                       verify(secondSuccessHandle).openInputStream();
+                       verifyZeroInteractions(thirdNotUsedHandle);
+                       Assert.assertFalse(iterator.hasNext());
+
+                       ListState<Integer> listState = 
restoredBackend.getListState(stateDescriptor);
+
+                       Iterator<Integer> stateIterator = 
listState.get().iterator();
+                       Assert.assertEquals(0, (int) stateIterator.next());
+                       Assert.assertEquals(1, (int) stateIterator.next());
+                       Assert.assertEquals(2, (int) stateIterator.next());
+                       Assert.assertEquals(3, (int) stateIterator.next());
+                       Assert.assertFalse(stateIterator.hasNext());
+
+               } finally {
+                       restoredBackend.close();
+                       restoredBackend.dispose();
+               }
+       }
+
+       /**
+        * Tests if there is an exception if all restore attempts are exhausted 
and failed.
+        */
+       @Test
+       public void testExceptionThrownIfAllRestoresFailed() throws Exception {
+
+               CloseableRegistry closeableRegistry = new CloseableRegistry();
+
+               OperatorStateHandle firstFailHandle = 
mock(OperatorStateHandle.class);
+               OperatorStateHandle secondFailHandle = 
mock(OperatorStateHandle.class);
+               OperatorStateHandle thirdFailHandle = 
mock(OperatorStateHandle.class);
+
+               List<StateObjectCollection<OperatorStateHandle>> 
sortedRestoreOptions = Arrays.asList(
+                       new 
StateObjectCollection<>(Collections.singletonList(firstFailHandle)),
+                       new 
StateObjectCollection<>(Collections.singletonList(secondFailHandle)),
+                       new 
StateObjectCollection<>(Collections.singletonList(thirdFailHandle)));
+               Iterator<StateObjectCollection<OperatorStateHandle>> iterator = 
sortedRestoreOptions.iterator();
+
+               BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> restorerProcedure =
+                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry);
+
+               try {
+                       restorerProcedure.createAndRestore(iterator);
+                       Assert.fail();
+               } catch (Exception ignore) {
+               }
+
+               verify(firstFailHandle).openInputStream();
+               verify(secondFailHandle).openInputStream();
+               verify(thirdFailHandle).openInputStream();
+               Assert.assertFalse(iterator.hasNext());
+       }
+
+       /**
+        * Test that the restore can be stopped via the provided closeable 
registry.
+        */
+       @Test
+       public void testCanBeCanceledViaRegistry() throws Exception {
+               CloseableRegistry closeableRegistry = new CloseableRegistry();
+               OneShotLatch waitForBlock = new OneShotLatch();
+               OneShotLatch unblock = new OneShotLatch();
+               OperatorStateHandle blockingRestoreHandle = 
mock(OperatorStateHandle.class);
+               when(blockingRestoreHandle.openInputStream()).thenReturn(new 
BlockingFSDataInputStream(waitForBlock, unblock));
+
+               List<StateObjectCollection<OperatorStateHandle>> 
sortedRestoreOptions =
+                       Collections.singletonList(new 
StateObjectCollection<>(Collections.singletonList(blockingRestoreHandle)));
+
+               BackendRestorerProcedure<OperatorStateBackend, 
OperatorStateHandle> restorerProcedure =
+                       new BackendRestorerProcedure<>(backendSupplier, 
closeableRegistry);
+
+               AtomicReference<Exception> exceptionReference = new 
AtomicReference<>(null);
+               Thread restoreThread = new Thread(() -> {
+                       try {
+                               
restorerProcedure.createAndRestore(sortedRestoreOptions.iterator());
+                       } catch (Exception e) {
+                               exceptionReference.set(e);
+                       }
+               });
+
+               restoreThread.start();
+               waitForBlock.await();
+               closeableRegistry.close();
+               unblock.trigger();
+               restoreThread.join();
+
+               Exception exception = exceptionReference.get();
+               Assert.assertTrue(exception instanceof FlinkException);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
new file mode 100644
index 0000000..173b49c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizerTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+
+/**
+ * Tests for {@link OperatorSnapshotFinalizer}.
+ */
+public class OperatorSnapshotFinalizerTest extends TestLogger {
+
+       /**
+        * Test that the runnable futures are executed and the result is 
correctly extracted.
+        */
+       @Test
+       public void testRunAndExtract() throws Exception{
+
+               Random random = new Random(0x42);
+
+               KeyedStateHandle keyedTemplate =
+                       StateHandleDummyUtil.createNewKeyedStateHandle(new 
KeyGroupRange(0, 0));
+               OperatorStateHandle operatorTemplate =
+                       StateHandleDummyUtil.createNewOperatorStateHandle(2, 
random);
+
+               SnapshotResult<KeyedStateHandle> snapKeyMan = 
SnapshotResult.withLocalState(
+                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate),
+                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate));
+
+               SnapshotResult<KeyedStateHandle> snapKeyRaw = 
SnapshotResult.withLocalState(
+                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate),
+                       StateHandleDummyUtil.deepDummyCopy(keyedTemplate));
+
+               SnapshotResult<OperatorStateHandle> snapOpMan = 
SnapshotResult.withLocalState(
+                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate),
+                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate));
+
+               SnapshotResult<OperatorStateHandle> snapOpRaw = 
SnapshotResult.withLocalState(
+                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate),
+                       StateHandleDummyUtil.deepDummyCopy(operatorTemplate));
+
+               DoneFuture<SnapshotResult<KeyedStateHandle>> managedKeyed = new 
PseudoNotDoneFuture<>(snapKeyMan);
+               DoneFuture<SnapshotResult<KeyedStateHandle>> rawKeyed = new 
PseudoNotDoneFuture<>(snapKeyRaw);
+               DoneFuture<SnapshotResult<OperatorStateHandle>> managedOp = new 
PseudoNotDoneFuture<>(snapOpMan);
+               DoneFuture<SnapshotResult<OperatorStateHandle>> rawOp = new 
PseudoNotDoneFuture<>(snapOpRaw);
+
+               Assert.assertFalse(managedKeyed.isDone());
+               Assert.assertFalse(rawKeyed.isDone());
+               Assert.assertFalse(managedOp.isDone());
+               Assert.assertFalse(rawOp.isDone());
+
+               OperatorSnapshotFutures futures = new 
OperatorSnapshotFutures(managedKeyed, rawKeyed, managedOp, rawOp);
+               OperatorSnapshotFinalizer operatorSnapshotFinalizer = new 
OperatorSnapshotFinalizer(futures);
+
+               Assert.assertTrue(managedKeyed.isDone());
+               Assert.assertTrue(rawKeyed.isDone());
+               Assert.assertTrue(managedOp.isDone());
+               Assert.assertTrue(rawOp.isDone());
+
+               OperatorSubtaskState jobManagerOwnedState = 
operatorSnapshotFinalizer.getJobManagerOwnedState();
+               
Assert.assertTrue(checkResult(snapKeyMan.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getManagedKeyedState()));
+               
Assert.assertTrue(checkResult(snapKeyRaw.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getRawKeyedState()));
+               
Assert.assertTrue(checkResult(snapOpMan.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getManagedOperatorState()));
+               
Assert.assertTrue(checkResult(snapOpRaw.getJobManagerOwnedSnapshot(), 
jobManagerOwnedState.getRawOperatorState()));
+
+               OperatorSubtaskState taskLocalState = 
operatorSnapshotFinalizer.getTaskLocalState();
+               
Assert.assertTrue(checkResult(snapKeyMan.getTaskLocalSnapshot(), 
taskLocalState.getManagedKeyedState()));
+               
Assert.assertTrue(checkResult(snapKeyRaw.getTaskLocalSnapshot(), 
taskLocalState.getRawKeyedState()));
+               Assert.assertTrue(checkResult(snapOpMan.getTaskLocalSnapshot(), 
taskLocalState.getManagedOperatorState()));
+               Assert.assertTrue(checkResult(snapOpRaw.getTaskLocalSnapshot(), 
taskLocalState.getRawOperatorState()));
+       }
+
+       private <T extends StateObject> boolean checkResult(T expected, 
StateObjectCollection<T> actual) {
+               if (expected == null) {
+                       return actual.isEmpty();
+               }
+
+               return actual.size() == 1 && expected == 
actual.iterator().next();
+       }
+
+       static class PseudoNotDoneFuture<T> extends DoneFuture<T> {
+
+               private boolean done;
+
+               PseudoNotDoneFuture(T payload) {
+                       super(payload);
+                       this.done = false;
+               }
+
+               @Override
+               public void run() {
+                       super.run();
+                       this.done = true;
+               }
+
+               @Override
+               public boolean isDone() {
+                       return done;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
index 6da39af..4122a71 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFuturesTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -28,7 +31,7 @@ import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.spy;
 
 /**
  * Tests for {@link OperatorSnapshotFutures}.
@@ -46,20 +49,28 @@ public class OperatorSnapshotFuturesTest extends TestLogger 
{
                operatorSnapshotResult.cancel();
 
                KeyedStateHandle keyedManagedStateHandle = 
mock(KeyedStateHandle.class);
-               RunnableFuture<KeyedStateHandle> keyedStateManagedFuture = 
mock(RunnableFuture.class);
-               
when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle);
+               SnapshotResult<KeyedStateHandle> keyedStateManagedResult =
+                       SnapshotResult.of(keyedManagedStateHandle);
+               RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateManagedFuture =
+                       spy(DoneFuture.of(keyedStateManagedResult));
 
                KeyedStateHandle keyedRawStateHandle = 
mock(KeyedStateHandle.class);
-               RunnableFuture<KeyedStateHandle> keyedStateRawFuture = 
mock(RunnableFuture.class);
-               when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle);
-
-               OperatorStateHandle operatorManagedStateHandle = 
mock(OperatorStateHandle.class);
-               RunnableFuture<OperatorStateHandle> operatorStateManagedFuture 
= mock(RunnableFuture.class);
-               
when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle);
-
-               OperatorStateHandle operatorRawStateHandle = 
mock(OperatorStateHandle.class);
-               RunnableFuture<OperatorStateHandle> operatorStateRawFuture = 
mock(RunnableFuture.class);
-               
when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle);
+               SnapshotResult<KeyedStateHandle> keyedStateRawResult =
+                       SnapshotResult.of(keyedRawStateHandle);
+               RunnableFuture<SnapshotResult<KeyedStateHandle>> 
keyedStateRawFuture =
+                       spy(DoneFuture.of(keyedStateRawResult));
+
+               OperatorStateHandle operatorManagedStateHandle = 
mock(OperatorStreamStateHandle.class);
+               SnapshotResult<OperatorStateHandle> operatorStateManagedResult =
+                       SnapshotResult.of(operatorManagedStateHandle);
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateManagedFuture =
+                       spy(DoneFuture.of(operatorStateManagedResult));
+
+               OperatorStateHandle operatorRawStateHandle = 
mock(OperatorStreamStateHandle.class);
+               SnapshotResult<OperatorStateHandle> operatorStateRawResult =
+                       SnapshotResult.of(operatorRawStateHandle);
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
operatorStateRawFuture =
+                       spy(DoneFuture.of(operatorStateRawResult));
 
                operatorSnapshotResult = new OperatorSnapshotFutures(
                        keyedStateManagedFuture,

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index c5e5df8..90649f2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -42,12 +43,13 @@ import 
org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StatePartitionStreamProvider;
-import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskLocalStateStore;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -61,7 +63,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -135,15 +136,15 @@ public class StateInitializationContextImplTest {
                                        
DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
                                        new 
OperatorStateHandle.StateMetaInfo(offsets.toArray(), 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
                        OperatorStateHandle operatorStateHandle =
-                                       new OperatorStateHandle(offsetsMap, new 
ByteStateHandleCloseChecking("os-" + i, out.toByteArray()));
+                                       new 
OperatorStreamStateHandle(offsetsMap, new ByteStateHandleCloseChecking("os-" + 
i, out.toByteArray()));
                        operatorStateHandles.add(operatorStateHandle);
                }
 
                OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState(
-                       Collections.emptyList(),
-                       operatorStateHandles,
-                       Collections.emptyList(),
-                       keyedStateHandles);
+                       StateObjectCollection.empty(),
+                       new StateObjectCollection<>(operatorStateHandles),
+                       StateObjectCollection.empty(),
+                       new StateObjectCollection<>(keyedStateHandles));
 
                OperatorID operatorID = new OperatorID();
                TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
@@ -154,7 +155,7 @@ public class StateInitializationContextImplTest {
                TaskStateManager manager = new TaskStateManagerImpl(
                        new JobID(),
                        new ExecutionAttemptID(),
-                       mock(TaskLocalStateStore.class),
+                       new TestTaskLocalStateStore(),
                        jobManagerTaskRestore,
                        mock(CheckpointResponder.class));
 

Reply via email to