[FLINK-8360][checkpointing] Implement state storage for local recovery and 
integrate with task lifecycle


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df3e6bb7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df3e6bb7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df3e6bb7

Branch: refs/heads/master
Commit: df3e6bb7627db03635febd48eff4c10032b668ef
Parents: ea0d16d
Author: Stefan Richter <[email protected]>
Authored: Tue Feb 6 14:44:01 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:14:21 2018 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011ITCase.java      |  31 +-
 .../configuration/CheckpointingOptions.java     |  14 +
 .../flink/configuration/ConfigurationUtils.java |  28 +-
 .../flink/core/fs/local/LocalFileSystem.java    |  10 +-
 .../java/org/apache/flink/util/Disposable.java  |  36 +++
 .../flink/util/MethodForwardingTestUtil.java    | 179 ++++++++++
 .../flink/docs/rest/RestAPIDocGenerator.java    |   4 +-
 .../clusterframework/MesosTaskManager.scala     |   3 +
 .../KVStateRequestSerializerRocksDBTest.java    |   7 +-
 .../network/KvStateRequestSerializerTest.java   |  19 +-
 .../history/HistoryServerArchiveFetcher.java    |   4 +-
 .../checkpoint/JobManagerTaskRestore.java       |  10 +-
 .../checkpoint/OperatorSubtaskState.java        |  97 ++----
 .../PrioritizedOperatorSubtaskState.java        | 297 +++++++++++++++++
 .../RoundRobinOperatorStateRepartitioner.java   |  22 +-
 .../checkpoint/StateAssignmentOperation.java    |  10 +-
 .../checkpoint/StateObjectCollection.java       | 205 ++++++++++++
 .../flink/runtime/checkpoint/SubtaskState.java  |   2 +-
 .../runtime/checkpoint/TaskStateSnapshot.java   |  21 +-
 .../runtime/checkpoint/savepoint/Savepoint.java |   8 +-
 .../savepoint/SavepointV1Serializer.java        |   7 +-
 .../checkpoint/savepoint/SavepointV2.java       |   2 +-
 .../savepoint/SavepointV2Serializer.java        |   7 +-
 .../flink/runtime/executiongraph/Execution.java |   2 +-
 .../runtime/operators/sort/InMemorySorter.java  |  10 +-
 .../state/AbstractKeyedStateBackend.java        |  10 +-
 .../runtime/state/CheckpointStreamFactory.java  |   7 +-
 .../CheckpointStreamWithResultProvider.java     | 221 +++++++++++++
 .../state/DefaultOperatorStateBackend.java      |  31 +-
 .../state/DirectoryKeyedStateHandle.java        | 109 +++++++
 .../runtime/state/DirectoryStateHandle.java     |  87 +++++
 .../apache/flink/runtime/state/DoneFuture.java  |  18 +-
 .../DuplicatingCheckpointOutputStream.java      | 283 ++++++++++++++++
 .../state/IncrementalLocalKeyedStateHandle.java | 145 +++++++++
 .../flink/runtime/state/KeyedStateBackend.java  |   7 +-
 .../runtime/state/LocalRecoveryConfig.java      |  99 ++++++
 .../state/LocalRecoveryDirectoryProvider.java   |  81 +++++
 .../LocalRecoveryDirectoryProviderImpl.java     | 138 ++++++++
 .../runtime/state/MultiStreamStateHandle.java   | 104 ------
 .../runtime/state/OperatorStateBackend.java     |  13 +-
 .../OperatorStateCheckpointOutputStream.java    |   6 +-
 .../runtime/state/OperatorStateHandle.java      | 116 ++-----
 .../state/OperatorStreamStateHandle.java        | 120 +++++++
 .../flink/runtime/state/SnapshotDirectory.java  | 200 ++++++++++++
 .../flink/runtime/state/SnapshotResult.java     | 120 +++++++
 .../flink/runtime/state/SnapshotStrategy.java   |  51 +++
 .../flink/runtime/state/Snapshotable.java       |   6 +-
 .../StateSnapshotContextSynchronousImpl.java    |  37 ++-
 .../TaskExecutorLocalStateStoresManager.java    | 223 +++++++++++--
 .../runtime/state/TaskLocalStateStore.java      |  72 +++--
 .../runtime/state/TaskLocalStateStoreImpl.java  | 288 +++++++++++++++++
 .../flink/runtime/state/TaskStateManager.java   |  24 +-
 .../runtime/state/TaskStateManagerImpl.java     |  63 +++-
 .../filesystem/FileBasedStateOutputStream.java  | 157 +++++++++
 .../filesystem/FsCheckpointStreamFactory.java   |   9 +-
 .../state/filesystem/FsStateBackend.java        |  14 +-
 .../state/heap/HeapKeyedStateBackend.java       |  33 +-
 .../memory/MemCheckpointStreamFactory.java      |   6 +-
 .../state/memory/MemoryStateBackend.java        |   9 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  10 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |   1 +
 .../taskexecutor/TaskManagerServices.java       |  39 ++-
 .../TaskManagerServicesConfiguration.java       |  28 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  33 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   5 +
 .../flink/runtime/taskmanager/TaskManager.scala |  40 ++-
 .../CheckpointCoordinatorFailureTest.java       |   7 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  23 +-
 .../CheckpointMetadataLoadingTest.java          |   4 +-
 .../checkpoint/CheckpointStateRestoreTest.java  |   9 +-
 .../CompletedCheckpointStoreTest.java           |  10 +-
 .../PrioritizedOperatorSubtaskStateTest.java    | 292 +++++++++++++++++
 .../checkpoint/StateHandleDummyUtil.java        | 139 ++++++++
 .../checkpoint/StateObjectCollectionTest.java   |  70 ++++
 .../checkpoint/TaskStateSnapshotTest.java       | 129 ++++++++
 .../savepoint/CheckpointTestUtils.java          |  26 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  32 +-
 .../ZooKeeperSubmittedJobGraphsStoreITCase.java |   6 +-
 .../scheduler/SchedulerTestUtils.java           |   5 +-
 .../runtime/metrics/TaskManagerMetricsTest.java |   3 +
 .../operators/testutils/DummyEnvironment.java   |   6 +
 .../operators/testutils/MockEnvironment.java    |  48 ++-
 .../CheckpointStreamWithResultProviderTest.java | 210 ++++++++++++
 .../DuplicatingCheckpointOutputStreamTest.java  | 310 ++++++++++++++++++
 .../LocalRecoveryDirectoryProviderImplTest.java | 122 +++++++
 .../runtime/state/MemoryStateBackendTest.java   |  28 +-
 .../state/MultiStreamStateHandleTest.java       | 135 --------
 .../runtime/state/OperatorStateBackendTest.java |  65 ++--
 .../runtime/state/OperatorStateHandleTest.java  |  40 ---
 .../state/OperatorStreamStateHandleTest.java    |  40 +++
 .../runtime/state/SnapshotDirectoryTest.java    | 206 ++++++++++++
 .../flink/runtime/state/SnapshotResultTest.java |  68 ++++
 .../runtime/state/StateBackendTestBase.java     | 104 +++---
 .../state/StateSnapshotCompressionTest.java     |  23 +-
 ...TaskExecutorLocalStateStoresManagerTest.java | 224 +++++++++++++
 .../state/TaskLocalStateStoreImplTest.java      | 174 ++++++++++
 .../runtime/state/TaskStateManagerImplTest.java | 190 +++++++++--
 .../runtime/state/TestLocalRecoveryConfig.java  |  69 ++++
 .../state/TestMemoryCheckpointOutputStream.java |   5 +-
 .../runtime/state/TestTaskLocalStateStore.java  | 113 +++++++
 .../runtime/state/TestTaskStateManager.java     | 125 +++++--
 ...ractCheckpointStateOutputStreamTestBase.java | 324 +++++++++++++++++++
 .../FileBasedStateOutputStreamTest.java         |  41 +++
 .../FsCheckpointMetadataOutputStreamTest.java   | 282 +---------------
 ...pKeyedStateBackendSnapshotMigrationTest.java |   5 +-
 .../state/heap/HeapStateBackendTestBase.java    |   5 +-
 .../state/testutils/BackendForTestStream.java   |   4 +-
 .../NetworkBufferCalculationTest.java           |   3 +
 .../taskexecutor/TaskExecutorITCase.java        |  12 +
 .../runtime/taskexecutor/TaskExecutorTest.java  |  80 +++++
 .../TaskManagerServicesBuilder.java             |   3 +-
 ...askManagerComponentsStartupShutdownTest.java |   9 +
 .../taskmanager/TaskManagerStartupTest.java     |  19 +-
 .../util/BlockerCheckpointStreamFactory.java    |  85 +----
 .../util/BlockingCheckpointOutputStream.java    | 202 ++++++++++++
 .../runtime/util/BlockingFSDataInputStream.java | 196 +++++++++++
 .../runtime/util/JvmExitOnFatalErrorTest.java   |  21 +-
 .../TestByteStreamStateHandleDeepCompare.java   |  54 ----
 .../testingUtils/TestingTaskManager.scala       |   5 +
 .../state/RocksDBKeyedStateBackend.java         |  39 ++-
 .../streaming/state/RocksDBStateBackend.java    |  41 +--
 .../state/RocksDBAsyncSnapshotTest.java         |  21 +-
 .../state/RocksDBStateBackendConfigTest.java    |   4 +
 .../state/RocksDBStateBackendTest.java          |  28 +-
 .../api/operators/AbstractStreamOperator.java   |   5 +-
 .../api/operators/BackendRestorerProcedure.java | 146 +++++++++
 .../operators/OperatorSnapshotFinalizer.java    |  80 +++++
 .../api/operators/OperatorSnapshotFutures.java  |  69 ++--
 .../streaming/api/operators/StreamOperator.java |   4 +-
 .../operators/StreamOperatorStateContext.java   |   1 -
 .../StreamTaskStateInitializerImpl.java         | 204 +++++-------
 .../streaming/runtime/tasks/StreamTask.java     | 129 +++++---
 .../operators/AbstractStreamOperatorTest.java   |   7 +-
 .../operators/BackendRestorerProcedureTest.java | 206 ++++++++++++
 .../OperatorSnapshotFinalizerTest.java          | 130 ++++++++
 .../operators/OperatorSnapshotFuturesTest.java  |  37 ++-
 .../StateInitializationContextImplTest.java     |  17 +-
 .../StreamTaskStateInitializerImplTest.java     |  49 +--
 .../operators/async/AsyncWaitOperatorTest.java  |   2 +-
 .../operators/windowing/WindowOperatorTest.java |   3 -
 .../tasks/InterruptSensitiveRestoreTest.java    |  14 +-
 .../runtime/tasks/LocalStateForwardingTest.java | 238 ++++++++++++++
 .../runtime/tasks/OneInputStreamTaskTest.java   |   6 +-
 .../runtime/tasks/RestoreStreamTaskTest.java    |   2 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   8 +-
 .../tasks/StreamTaskTerminationTest.java        |   5 +-
 .../streaming/runtime/tasks/StreamTaskTest.java | 116 +++----
 .../runtime/tasks/StreamTaskTestHarness.java    |   2 +-
 .../tasks/TaskCheckpointingBehaviourTest.java   |  16 +-
 .../tasks/TestSpyWrapperStateBackend.java       |  82 +++++
 .../util/AbstractStreamOperatorTestHarness.java | 138 ++++----
 .../streaming/util/OperatorSnapshotUtil.java    |  25 +-
 ...tractEventTimeWindowCheckpointingITCase.java |  41 +--
 .../test/checkpointing/LocalRecoveryITCase.java | 120 +++++++
 .../test/checkpointing/RescalingITCase.java     |   1 -
 .../ResumeCheckpointManuallyITCase.java         | 117 ++++++-
 .../test/state/ManualWindowSpeedITCase.java     |   2 +-
 .../PojoSerializerUpgradeTest.java              |   8 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |   4 +
 .../org/apache/flink/yarn/YarnTaskManager.scala |   3 +
 160 files changed, 8806 insertions(+), 1872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index e76cf13..81bf0bf 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSch
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -389,34 +390,34 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                final int parallelism3 = 3;
                final int maxParallelism = Math.max(parallelism1, 
Math.max(parallelism2, parallelism3));
 
-               List<OperatorStateHandle> operatorStateHandles = 
repartitionAndExecute(
+               List<OperatorStateHandle> operatorSubtaskState = 
repartitionAndExecute(
                        topic,
                        Collections.emptyList(),
                        parallelism1,
                        maxParallelism,
                        IntStream.range(0, parallelism1).boxed().iterator());
 
-               operatorStateHandles = repartitionAndExecute(
+               operatorSubtaskState = repartitionAndExecute(
                        topic,
-                       operatorStateHandles,
+                       operatorSubtaskState,
                        parallelism2,
                        maxParallelism,
                        IntStream.range(parallelism1,  parallelism1 + 
parallelism2).boxed().iterator());
 
-               operatorStateHandles = repartitionAndExecute(
+               operatorSubtaskState = repartitionAndExecute(
                        topic,
-                       operatorStateHandles,
+                       operatorSubtaskState,
                        parallelism3,
                        maxParallelism,
                        IntStream.range(parallelism1 + parallelism2,  
parallelism1 + parallelism2 + parallelism3).boxed().iterator());
 
                // After each previous repartitionAndExecute call, we are left 
with some lingering transactions, that would
                // not allow us to read all committed messages from the topic. 
Thus we initialize operators from
-               // operatorStateHandles once more, but without any new data. 
This should terminate all ongoing transactions.
+               // OperatorSubtaskState once more, but without any new data. 
This should terminate all ongoing transactions.
 
-               operatorStateHandles = repartitionAndExecute(
+               operatorSubtaskState = repartitionAndExecute(
                        topic,
-                       operatorStateHandles,
+                       operatorSubtaskState,
                        1,
                        maxParallelism,
                        Collections.emptyIterator());
@@ -448,10 +449,10 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        testHarness.setup();
 
                        testHarness.initializeState(new OperatorSubtaskState(
-                               inputStates,
-                               Collections.emptyList(),
-                               Collections.emptyList(),
-                               Collections.emptyList()));
+                               new StateObjectCollection<>(inputStates),
+                               StateObjectCollection.empty(),
+                               StateObjectCollection.empty(),
+                               StateObjectCollection.empty()));
                        testHarness.open();
 
                        if (inputData.hasNext()) {
@@ -460,9 +461,9 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                                OperatorSubtaskState snapshot = 
testHarness.snapshot(0, 0);
 
                                
outputStates.addAll(snapshot.getManagedOperatorState());
-                               checkState(snapshot.getRawOperatorState() == 
null, "Unexpected raw operator state");
-                               checkState(snapshot.getManagedKeyedState() == 
null, "Unexpected managed keyed state");
-                               checkState(snapshot.getRawKeyedState() == null, 
"Unexpected raw keyed state");
+                               
checkState(snapshot.getRawOperatorState().isEmpty(), "Unexpected raw operator 
state");
+                               
checkState(snapshot.getManagedKeyedState().isEmpty(), "Unexpected managed keyed 
state");
+                               
checkState(snapshot.getRawKeyedState().isEmpty(), "Unexpected raw keyed state");
 
                                for (int i = 1; i < 
FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE - 1; i++) {
                                        testHarness.processElement(-nextValue, 
0);

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index c89aff2..c6af7dd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -66,6 +66,20 @@ public class CheckpointingOptions {
                                " complete checkpoint state. Some state 
backends may not support incremental checkpoints and ignore" +
                                " this option.");
 
+       /**
+        * This option configures local recovery for this state backend.
+        */
+       public static final ConfigOption<String> LOCAL_RECOVERY = ConfigOptions
+               .key("state.backend.local-recovery")
+               .defaultValue("DISABLED");
+
+       /**
+        * The config parameter defining the root directories for storing 
file-based state for local recovery.
+        */
+       public static final ConfigOption<String> 
LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS = ConfigOptions
+               .key("taskmanager.state.local.root-dirs")
+               .noDefaultValue();
+
        // 
------------------------------------------------------------------------
        //  Options specific to the file-system-based state backends
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index e697e6f..8566a43 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.configuration;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
 
 /**
@@ -25,6 +27,8 @@ import java.io.File;
  */
 public class ConfigurationUtils {
 
+       private static final String[] EMPTY = new String[0];
+
        /**
         * Extracts the task manager directories for temporary files as defined 
by
         * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
@@ -32,10 +36,30 @@ public class ConfigurationUtils {
         * @param configuration configuration object
         * @return array of configured directories (in order)
         */
+       @Nonnull
        public static String[] parseTempDirectories(Configuration 
configuration) {
-               return configuration.getString(CoreOptions.TMP_DIRS).split(",|" 
+ File.pathSeparator);
+               return 
splitPaths(configuration.getString(CoreOptions.TMP_DIRS));
+       }
+
+       /**
+        * Extracts the local state directories  as defined by
+        * {@link 
CheckpointingOptions#LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS}.
+        *
+        * @param configuration configuration object
+        * @return array of configured directories (in order)
+        */
+       @Nonnull
+       public static String[] parseLocalStateDirectories(Configuration 
configuration) {
+               String configValue = 
configuration.getString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS,
 "");
+               return splitPaths(configValue);
+       }
+
+       @Nonnull
+       private static String[] splitPaths(@Nonnull String separatedPaths) {
+               return separatedPaths.length() > 0 ? separatedPaths.split(",|" 
+ File.pathSeparator) : EMPTY;
        }
 
        // Make sure that we cannot instantiate this class
-       private ConfigurationUtils() {}
+       private ConfigurationUtils() {
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index ee2ecbe..2d7fbd5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -211,10 +211,12 @@ public class LocalFileSystem extends FileSystem {
 
                if (f.isDirectory()) {
                        final File[] files = f.listFiles();
-                       for (File file : files) {
-                               final boolean del = delete(file);
-                               if (!del) {
-                                       return false;
+                       if (files != null) {
+                               for (File file : files) {
+                                       final boolean del = delete(file);
+                                       if (!del) {
+                                               return false;
+                                       }
                                }
                        }
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/main/java/org/apache/flink/util/Disposable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Disposable.java 
b/flink-core/src/main/java/org/apache/flink/util/Disposable.java
new file mode 100644
index 0000000..12ef763
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/Disposable.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Interface for classes that can be disposed, i.e. that have a dedicated 
lifecycle step to "destroy" the object. On
+ * reason for this is for example to release native resources. From this 
point, the interface fulfills a similar purpose
+ * as the {@link java.io.Closeable} interface, but sometimes both should be 
represented as isolated, independent
+ * lifecycle steps.
+ */
+public interface Disposable {
+
+       /**
+        * Disposes the object and releases all resources. After calling this 
method, calling any methods on the
+        * object may result in undefined behavior.
+        *
+        * @throws Exception if something goes wrong during disposal.
+        */
+       void dispose() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java 
b/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java
new file mode 100644
index 0000000..8a9e8ca
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+import org.mockito.internal.util.MockUtil;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempts to automatically test method 
forwarding between a delegate and a wrapper.
+ */
+public class MethodForwardingTestUtil {
+
+       /**
+        * This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+        * class is a subtype of the delegate. This ignores methods that are 
inherited from Object.
+        *
+        * @param delegateClass the class for the delegate.
+        * @param wrapperFactory factory that produces a wrapper from a 
delegate.
+        * @param <D> type of the delegate
+        * @param <W> type of the wrapper
+        */
+       public static <D, W> void testMethodForwarding(
+               Class<D> delegateClass,
+               Function<D, W> wrapperFactory)
+               throws NoSuchMethodException, IllegalAccessException, 
InvocationTargetException {
+               testMethodForwarding(delegateClass, wrapperFactory, () -> 
spy(delegateClass), Collections.emptySet());
+       }
+
+       /**
+        * This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+        * class is a subtype of the delegate. This ignores methods that are 
inherited from Object.
+        *
+        * @param delegateClass the class for the delegate.
+        * @param wrapperFactory factory that produces a wrapper from a 
delegate.
+        * @param delegateObjectSupplier supplier for the delegate object 
passed to the wrapper factory.
+        * @param <D> type of the delegate
+        * @param <W> type of the wrapper
+        * @param <I> type of the object created as delegate, is a subtype of D.
+        */
+       public static <D, W, I extends D> void testMethodForwarding(
+               Class<D> delegateClass,
+               Function<I, W> wrapperFactory,
+               Supplier<I> delegateObjectSupplier)
+               throws NoSuchMethodException, IllegalAccessException, 
InvocationTargetException {
+               testMethodForwarding(delegateClass, wrapperFactory, 
delegateObjectSupplier, Collections.emptySet());
+       }
+
+       /**
+        * This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+        * class is a subtype of the delegate. Methods can be remapped in case 
that the implementation does not call the
+        * original method. Remapping to null skips the method. This ignores 
methods that are inherited from Object.
+        *
+        * @param delegateClass the class for the delegate.
+        * @param wrapperFactory factory that produces a wrapper from a 
delegate.
+        * @param delegateObjectSupplier supplier for the delegate object 
passed to the wrapper factory.
+        * @param skipMethodSet set of methods to ignore.
+        * @param <D> type of the delegate
+        * @param <W> type of the wrapper
+        * @param <I> type of the object created as delegate, is a subtype of D.
+        */
+       public static <D, W, I extends D> void testMethodForwarding(
+               Class<D> delegateClass,
+               Function<I, W> wrapperFactory,
+               Supplier<I> delegateObjectSupplier,
+               Set<Method> skipMethodSet) throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+
+               Preconditions.checkNotNull(delegateClass);
+               Preconditions.checkNotNull(wrapperFactory);
+               Preconditions.checkNotNull(skipMethodSet);
+
+               I delegate = delegateObjectSupplier.get();
+
+               //check if we need to wrap the delegate object as a spy, or if 
it is already testable with Mockito.
+               MockUtil mockUtil = new MockUtil();
+               if (!mockUtil.isSpy(delegate) || !mockUtil.isMock(delegate)) {
+                       delegate = spy(delegate);
+               }
+
+               W wrapper = wrapperFactory.apply(delegate);
+
+               // ensure that wrapper is a subtype of delegate
+               
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));
+
+               for (Method delegateMethod : delegateClass.getMethods()) {
+
+                       if (checkSkipMethodForwardCheck(delegateMethod, 
skipMethodSet)) {
+                               continue;
+                       }
+
+                       // find the correct method to substitute the bridge for 
erased generic types.
+                       // if this doesn't work, the user need to exclude the 
method and write an additional test.
+                       Method wrapperMethod = wrapper.getClass().getMethod(
+                               delegateMethod.getName(),
+                               delegateMethod.getParameterTypes());
+
+                       // things get a bit fuzzy here, best effort to find a 
match but this might end up with a wrong method.
+                       if (wrapperMethod.isBridge()) {
+                               for (Method method : 
wrapper.getClass().getMethods()) {
+                                       if (!method.isBridge()
+                                               && 
method.getName().equals(wrapperMethod.getName())
+                                               && method.getParameterCount() 
== wrapperMethod.getParameterCount()) {
+                                               wrapperMethod = method;
+                                               break;
+                                       }
+                               }
+                       }
+
+                       Class<?>[] parameterTypes = 
wrapperMethod.getParameterTypes();
+                       Object[] arguments = new Object[parameterTypes.length];
+                       for (int j = 0; j < arguments.length; j++) {
+                               Class<?> parameterType = parameterTypes[j];
+                               if (parameterType.isArray()) {
+                                       arguments[j] = 
Array.newInstance(parameterType.getComponentType(), 0);
+                               } else if (parameterType.isPrimitive()) {
+                                       if 
(boolean.class.equals(parameterType)) {
+                                               arguments[j] = false;
+                                       } else if 
(char.class.equals(parameterType)) {
+                                               arguments[j] = 'a';
+                                       } else {
+                                               arguments[j] = (byte) 0;
+                                       }
+                               } else {
+                                       arguments[j] = 
Mockito.mock(parameterType);
+                               }
+                       }
+
+                       wrapperMethod.invoke(wrapper, arguments);
+                       delegateMethod.invoke(Mockito.verify(delegate, 
Mockito.times(1)), arguments);
+                       reset(delegate);
+               }
+       }
+
+       /**
+        * Test if this method should be skipped in our check for proper 
forwarding, e.g. because it is just a bridge.
+        */
+       private static boolean checkSkipMethodForwardCheck(Method 
delegateMethod, Set<Method> skipMethods) {
+
+               if (delegateMethod.isBridge()
+                       || delegateMethod.isDefault()
+                       || skipMethods.contains(delegateMethod)) {
+                       return true;
+               }
+
+               // skip methods declared in Object (Mockito doesn't like them)
+               try {
+                       Object.class.getMethod(delegateMethod.getName(), 
delegateMethod.getParameterTypes());
+                       return true;
+               } catch (Exception ignore) {
+               }
+               return false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index b3a5def..be5f677 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -120,9 +120,7 @@ public class RestAPIDocGenerator {
                List<MessageHeaders> specs = restEndpoint.getSpecs();
                specs.forEach(spec -> html.append(createHtmlEntry(spec)));
 
-               if (Files.exists(outputFile)) {
-                       Files.delete(outputFile);
-               }
+               Files.deleteIfExists(outputFile);
                Files.write(outputFile, 
html.toString().getBytes(StandardCharsets.UTF_8));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index e69472e..12cc375 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
@@ -37,6 +38,7 @@ class MesosTaskManager(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
+    taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup : TaskManagerMetricGroup)
@@ -47,6 +49,7 @@ class MesosTaskManager(
     memoryManager,
     ioManager,
     network,
+    taskManagerLocalStateStoresManager,
     numberOfSlots,
     highAvailabilityServices,
     taskManagerMetricGroup) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 07517ab..dd75dd6 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 
@@ -76,7 +77,8 @@ public final class KVStateRequestSerializerRocksDBTest {
                        super(operatorIdentifier, userCodeClassLoader,
                                instanceBasePath,
                                dbOptions, columnFamilyOptions, 
kvStateRegistry, keySerializer,
-                               numberOfKeyGroups, keyGroupRange, 
executionConfig, false);
+                               numberOfKeyGroups, keyGroupRange, 
executionConfig, false,
+                               TestLocalRecoveryConfig.disabled());
                }
 
                @Override
@@ -152,7 +154,8 @@ public final class KVStateRequestSerializerRocksDBTest {
                                LongSerializer.INSTANCE,
                                1, new KeyGroupRange(0, 0),
                                new ExecutionConfig(),
-                               false);
+                               false,
+                               TestLocalRecoveryConfig.disabled());
                longHeapKeyedStateBackend.restore(null);
                longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index d3314ab..8d10141 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -194,7 +195,8 @@ public class KvStateRequestSerializerTest {
                                1,
                                new KeyGroupRange(0, 0),
                                async,
-                               new ExecutionConfig()
+                               new ExecutionConfig(),
+                               TestLocalRecoveryConfig.disabled()
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -290,13 +292,14 @@ public class KvStateRequestSerializerTest {
                // objects for heap state list serialisation
                final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
                        new HeapKeyedStateBackend<>(
-                                       mock(TaskKvStateRegistry.class),
-                                       LongSerializer.INSTANCE,
-                                       ClassLoader.getSystemClassLoader(),
-                                       1,
-                                       new KeyGroupRange(0, 0),
-                                       async,
-                                       new ExecutionConfig()
+                               mock(TaskKvStateRegistry.class),
+                               LongSerializer.INSTANCE,
+                               ClassLoader.getSystemClassLoader(),
+                               1,
+                               new KeyGroupRange(0, 0),
+                               async,
+                               new ExecutionConfig(),
+                               TestLocalRecoveryConfig.disabled()
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 58d532b..450436f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -180,9 +180,7 @@ class HistoryServerArchiveFetcher {
 
                                                                        // We 
overwrite existing files since this may be another attempt at fetching this 
archive.
                                                                        // 
Existing files may be incomplete/corrupt.
-                                                                       if 
(Files.exists(targetPath)) {
-                                                                               
Files.delete(targetPath);
-                                                                       }
+                                                                       
Files.deleteIfExists(targetPath);
 
                                                                        
Files.createFile(target.toPath());
                                                                        try 
(FileWriter fw = new FileWriter(target)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
index d5ac3e0..2537db1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
 import java.io.Serializable;
 
 /**
@@ -27,11 +30,13 @@ public class JobManagerTaskRestore implements Serializable {
 
        private static final long serialVersionUID = 1L;
 
+       /** The id of the checkpoint from which we restore. */
        private final long restoreCheckpointId;
 
+       /** The state for this task to restore. */
        private final TaskStateSnapshot taskStateSnapshot;
 
-       public JobManagerTaskRestore(long restoreCheckpointId, 
TaskStateSnapshot taskStateSnapshot) {
+       public JobManagerTaskRestore(@Nonnegative long restoreCheckpointId, 
@Nonnull TaskStateSnapshot taskStateSnapshot) {
                this.restoreCheckpointId = restoreCheckpointId;
                this.taskStateSnapshot = taskStateSnapshot;
        }
@@ -40,13 +45,14 @@ public class JobManagerTaskRestore implements Serializable {
                return restoreCheckpointId;
        }
 
+       @Nonnull
        public TaskStateSnapshot getTaskStateSnapshot() {
                return taskStateSnapshot;
        }
 
        @Override
        public String toString() {
-               return "TaskRestore{" +
+               return "JobManagerTaskRestore{" +
                        "restoreCheckpointId=" + restoreCheckpointId +
                        ", taskStateSnapshot=" + taskStateSnapshot +
                        '}';

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 3df9c4f..c2b8e9a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.state.CompositeStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
@@ -30,10 +30,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -50,8 +49,6 @@ import java.util.List;
  * Under normal circumstances, the expected size of each collection is still 0 
or 1, except for scale-down. In
  * scale-down, one operator subtask can become responsible for the state of 
multiple previous subtasks. The collections
  * can then store all the state handles that are relevant to build up the new 
subtask state.
- *
- * <p>There is no collection for legacy state because it is not rescalable.
  */
 public class OperatorSubtaskState implements CompositeStateHandle {
 
@@ -63,25 +60,25 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Snapshot from the {@link 
org.apache.flink.runtime.state.OperatorStateBackend}.
         */
        @Nonnull
-       private final Collection<OperatorStateHandle> managedOperatorState;
+       private final StateObjectCollection<OperatorStateHandle> 
managedOperatorState;
 
        /**
         * Snapshot written using {@link 
org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}.
         */
        @Nonnull
-       private final Collection<OperatorStateHandle> rawOperatorState;
+       private final StateObjectCollection<OperatorStateHandle> 
rawOperatorState;
 
        /**
         * Snapshot from {@link 
org.apache.flink.runtime.state.KeyedStateBackend}.
         */
        @Nonnull
-       private final Collection<KeyedStateHandle> managedKeyedState;
+       private final StateObjectCollection<KeyedStateHandle> managedKeyedState;
 
        /**
         * Snapshot written using {@link 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}.
         */
        @Nonnull
-       private final Collection<KeyedStateHandle> rawKeyedState;
+       private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
 
        /**
         * The state size. This is also part of the deserialized state handle.
@@ -95,43 +92,39 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         */
        public OperatorSubtaskState() {
                this(
-                       Collections.emptyList(),
-                       Collections.emptyList(),
-                       Collections.emptyList(),
-                       Collections.emptyList());
+                       StateObjectCollection.empty(),
+                       StateObjectCollection.empty(),
+                       StateObjectCollection.empty(),
+                       StateObjectCollection.empty());
        }
 
        public OperatorSubtaskState(
-               Collection<OperatorStateHandle> managedOperatorState,
-               Collection<OperatorStateHandle> rawOperatorState,
-               Collection<KeyedStateHandle> managedKeyedState,
-               Collection<KeyedStateHandle> rawKeyedState) {
+               @Nonnull StateObjectCollection<OperatorStateHandle> 
managedOperatorState,
+               @Nonnull StateObjectCollection<OperatorStateHandle> 
rawOperatorState,
+               @Nonnull StateObjectCollection<KeyedStateHandle> 
managedKeyedState,
+               @Nonnull StateObjectCollection<KeyedStateHandle> rawKeyedState) 
{
 
                this.managedOperatorState = 
Preconditions.checkNotNull(managedOperatorState);
                this.rawOperatorState = 
Preconditions.checkNotNull(rawOperatorState);
                this.managedKeyedState = 
Preconditions.checkNotNull(managedKeyedState);
                this.rawKeyedState = Preconditions.checkNotNull(rawKeyedState);
 
-               try {
-                       long calculateStateSize = 
sumAllSizes(managedOperatorState);
-                       calculateStateSize += sumAllSizes(rawOperatorState);
-                       calculateStateSize += sumAllSizes(managedKeyedState);
-                       calculateStateSize += sumAllSizes(rawKeyedState);
-                       stateSize = calculateStateSize;
-               } catch (Exception e) {
-                       throw new RuntimeException("Failed to get state size.", 
e);
-               }
+               long calculateStateSize = managedOperatorState.getStateSize();
+               calculateStateSize += rawOperatorState.getStateSize();
+               calculateStateSize += managedKeyedState.getStateSize();
+               calculateStateSize += rawKeyedState.getStateSize();
+               stateSize = calculateStateSize;
        }
 
        /**
         * For convenience because the size of the collections is typically 0 
or 1. Null values are translated into empty
-        * Collections (except for legacy state).
+        * Collections.
         */
        public OperatorSubtaskState(
-               OperatorStateHandle managedOperatorState,
-               OperatorStateHandle rawOperatorState,
-               KeyedStateHandle managedKeyedState,
-               KeyedStateHandle rawKeyedState) {
+               @Nullable OperatorStateHandle managedOperatorState,
+               @Nullable OperatorStateHandle rawOperatorState,
+               @Nullable KeyedStateHandle managedKeyedState,
+               @Nullable KeyedStateHandle rawKeyedState) {
 
                this(
                        singletonOrEmptyOnNull(managedOperatorState),
@@ -140,21 +133,8 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
                        singletonOrEmptyOnNull(rawKeyedState));
        }
 
-       private static <T> Collection<T> singletonOrEmptyOnNull(T element) {
-               return element != null ? Collections.singletonList(element) : 
Collections.<T>emptyList();
-       }
-
-       private static long sumAllSizes(Collection<? extends StateObject> 
stateObject) throws Exception {
-               long size = 0L;
-               for (StateObject object : stateObject) {
-                       size += getSizeNullSafe(object);
-               }
-
-               return size;
-       }
-
-       private static long getSizeNullSafe(StateObject stateObject) throws 
Exception {
-               return stateObject != null ? stateObject.getStateSize() : 0L;
+       private static <T extends StateObject> StateObjectCollection<T> 
singletonOrEmptyOnNull(T element) {
+               return element != null ? 
StateObjectCollection.singleton(element) : StateObjectCollection.empty();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -163,7 +143,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Returns a handle to the managed operator state.
         */
        @Nonnull
-       public Collection<OperatorStateHandle> getManagedOperatorState() {
+       public StateObjectCollection<OperatorStateHandle> 
getManagedOperatorState() {
                return managedOperatorState;
        }
 
@@ -171,7 +151,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Returns a handle to the raw operator state.
         */
        @Nonnull
-       public Collection<OperatorStateHandle> getRawOperatorState() {
+       public StateObjectCollection<OperatorStateHandle> getRawOperatorState() 
{
                return rawOperatorState;
        }
 
@@ -179,7 +159,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Returns a handle to the managed keyed state.
         */
        @Nonnull
-       public Collection<KeyedStateHandle> getManagedKeyedState() {
+       public StateObjectCollection<KeyedStateHandle> getManagedKeyedState() {
                return managedKeyedState;
        }
 
@@ -187,7 +167,7 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Returns a handle to the raw keyed state.
         */
        @Nonnull
-       public Collection<KeyedStateHandle> getRawKeyedState() {
+       public StateObjectCollection<KeyedStateHandle> getRawKeyedState() {
                return rawKeyedState;
        }
 
@@ -281,18 +261,9 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
        }
 
        public boolean hasState() {
-               return hasState(managedOperatorState)
-                       || hasState(rawOperatorState)
-                       || hasState(managedKeyedState)
-                       || hasState(rawKeyedState);
-       }
-
-       private boolean hasState(Iterable<? extends StateObject> states) {
-               for (StateObject state : states) {
-                       if (state != null) {
-                               return true;
-                       }
-               }
-               return false;
+               return managedOperatorState.hasState()
+                       || rawOperatorState.hasState()
+                       || managedKeyedState.hasState()
+                       || rawKeyedState.hasState();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
new file mode 100644
index 0000000..f48d311
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiFunction;
+
+/**
+ * This class is a wrapper over multiple alternative {@link 
OperatorSubtaskState} that are (partial) substitutes for
+ * each other and imposes a priority ordering over all alternatives for the 
different states which define an order in
+ * which the operator should attempt to restore the state from them. One 
OperatorSubtaskState is considered as the
+ * "ground truth" about which state should be represented. Alternatives may be 
complete or partial substitutes for
+ * the "ground truth" with a higher priority (if they had a lower alternative, 
they would not really be alternatives).
+ * Substitution is determined on a per-sub-state basis.
+ */
+@Internal
+public class PrioritizedOperatorSubtaskState {
+
+       /** Singleton instance for an empty, non-restored operator state. */
+       private static final PrioritizedOperatorSubtaskState 
EMPTY_NON_RESTORED_INSTANCE =
+               new PrioritizedOperatorSubtaskState.Builder(new 
OperatorSubtaskState(), Collections.emptyList(), false)
+                       .build();
+
+       /** List of prioritized snapshot alternatives for managed operator 
state. */
+       private final List<StateObjectCollection<OperatorStateHandle>> 
prioritizedManagedOperatorState;
+
+       /** List of prioritized snapshot alternatives for raw operator state. */
+       private final List<StateObjectCollection<OperatorStateHandle>> 
prioritizedRawOperatorState;
+
+       /** List of prioritized snapshot alternatives for managed keyed state. 
*/
+       private final List<StateObjectCollection<KeyedStateHandle>> 
prioritizedManagedKeyedState;
+
+       /** List of prioritized snapshot alternatives for raw keyed state. */
+       private final List<StateObjectCollection<KeyedStateHandle>> 
prioritizedRawKeyedState;
+
+       /** Signal flag if this represents state for a restored operator. */
+       private final boolean restored;
+
+       PrioritizedOperatorSubtaskState(
+               @Nonnull List<StateObjectCollection<KeyedStateHandle>> 
prioritizedManagedKeyedState,
+               @Nonnull List<StateObjectCollection<KeyedStateHandle>> 
prioritizedRawKeyedState,
+               @Nonnull List<StateObjectCollection<OperatorStateHandle>> 
prioritizedManagedOperatorState,
+               @Nonnull List<StateObjectCollection<OperatorStateHandle>> 
prioritizedRawOperatorState,
+               boolean restored) {
+
+               this.prioritizedManagedOperatorState = 
prioritizedManagedOperatorState;
+               this.prioritizedRawOperatorState = prioritizedRawOperatorState;
+               this.prioritizedManagedKeyedState = 
prioritizedManagedKeyedState;
+               this.prioritizedRawKeyedState = prioritizedRawKeyedState;
+               this.restored = restored;
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Returns an iterator over all alternative snapshots to restore the 
managed operator state, in the order in which
+        * we should attempt to restore.
+        */
+       @Nonnull
+       public Iterator<StateObjectCollection<OperatorStateHandle>> 
getPrioritizedManagedOperatorState() {
+               return prioritizedManagedOperatorState.iterator();
+       }
+
+       /**
+        * Returns an iterator over all alternative snapshots to restore the 
raw operator state, in the order in which we
+        * should attempt to restore.
+        */
+       @Nonnull
+       public Iterator<StateObjectCollection<OperatorStateHandle>> 
getPrioritizedRawOperatorState() {
+               return prioritizedRawOperatorState.iterator();
+       }
+
+       /**
+        * Returns an iterator over all alternative snapshots to restore the 
managed keyed state, in the order in which we
+        * should attempt to restore.
+        */
+       @Nonnull
+       public Iterator<StateObjectCollection<KeyedStateHandle>> 
getPrioritizedManagedKeyedState() {
+               return prioritizedManagedKeyedState.iterator();
+       }
+
+       /**
+        * Returns an iterator over all alternative snapshots to restore the 
raw keyed state, in the order in which we
+        * should attempt to restore.
+        */
+       @Nonnull
+       public Iterator<StateObjectCollection<KeyedStateHandle>> 
getPrioritizedRawKeyedState() {
+               return prioritizedRawKeyedState.iterator();
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Returns the managed operator state from the job manager, which 
represents the ground truth about what this state
+        * should represent. This is the alternative with lowest priority.
+        */
+       @Nonnull
+       public StateObjectCollection<OperatorStateHandle> 
getJobManagerManagedOperatorState() {
+               return lastElement(prioritizedManagedOperatorState);
+       }
+
+       /**
+        * Returns the raw operator state from the job manager, which 
represents the ground truth about what this state
+        * should represent. This is the alternative with lowest priority.
+        */
+       @Nonnull
+       public StateObjectCollection<OperatorStateHandle> 
getJobManagerRawOperatorState() {
+               return lastElement(prioritizedRawOperatorState);
+       }
+
+       /**
+        * Returns the managed keyed state from the job manager, which 
represents the ground truth about what this state
+        * should represent. This is the alternative with lowest priority.
+        */
+       @Nonnull
+       public StateObjectCollection<KeyedStateHandle> 
getJobManagerManagedKeyedState() {
+               return lastElement(prioritizedManagedKeyedState);
+       }
+
+       /**
+        * Returns the raw keyed state from the job manager, which represents 
the ground truth about what this state
+        * should represent. This is the alternative with lowest priority.
+        */
+       @Nonnull
+       public StateObjectCollection<KeyedStateHandle> 
getJobManagerRawKeyedState() {
+               return lastElement(prioritizedRawKeyedState);
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Returns true if this was created for a restored operator, false 
otherwise. Restored operators are operators that
+        * participated in a previous checkpoint, even if they did not emit any 
state snapshots.
+        */
+       public boolean isRestored() {
+               return restored;
+       }
+
+
+       private static <T extends StateObject> StateObjectCollection<T> 
lastElement(List<StateObjectCollection<T>> list) {
+               return list.get(list.size() - 1);
+       }
+
+       /**
+        * Returns an empty {@link PrioritizedOperatorSubtaskState} singleton 
for an empty, not-restored operator state.
+        */
+       public static PrioritizedOperatorSubtaskState emptyNotRestored() {
+               return EMPTY_NON_RESTORED_INSTANCE;
+       }
+
+       @Internal
+       public static class Builder {
+
+               /** Ground truth of state, provided by job manager. */
+               @Nonnull
+               private final OperatorSubtaskState jobManagerState;
+
+               /** (Local) alternatives to the job manager state. */
+               @Nonnull
+               private final List<OperatorSubtaskState> alternativesByPriority;
+
+               /** Flag if the states have been restored. */
+               private final boolean restored;
+
+               public Builder(
+                       @Nonnull OperatorSubtaskState jobManagerState,
+                       @Nonnull List<OperatorSubtaskState> 
alternativesByPriority) {
+                       this(jobManagerState, alternativesByPriority, true);
+               }
+
+               public Builder(
+                       @Nonnull OperatorSubtaskState jobManagerState,
+                       @Nonnull List<OperatorSubtaskState> 
alternativesByPriority,
+                       boolean restored) {
+
+                       this.jobManagerState = jobManagerState;
+                       this.alternativesByPriority = alternativesByPriority;
+                       this.restored = restored;
+               }
+
+               public PrioritizedOperatorSubtaskState build() {
+                       int size = alternativesByPriority.size();
+                       List<StateObjectCollection<OperatorStateHandle>> 
managedOperatorAlternatives = new ArrayList<>(size);
+                       List<StateObjectCollection<KeyedStateHandle>> 
managedKeyedAlternatives = new ArrayList<>(size);
+                       List<StateObjectCollection<OperatorStateHandle>> 
rawOperatorAlternatives = new ArrayList<>(size);
+                       List<StateObjectCollection<KeyedStateHandle>> 
rawKeyedAlternatives = new ArrayList<>(size);
+
+                       for (OperatorSubtaskState subtaskState : 
alternativesByPriority) {
+
+                               if (subtaskState != null) {
+                                       
managedKeyedAlternatives.add(subtaskState.getManagedKeyedState());
+                                       
rawKeyedAlternatives.add(subtaskState.getRawKeyedState());
+                                       
managedOperatorAlternatives.add(subtaskState.getManagedOperatorState());
+                                       
rawOperatorAlternatives.add(subtaskState.getRawOperatorState());
+                               }
+                       }
+
+                       // Key-groups should match.
+                       BiFunction<KeyedStateHandle, KeyedStateHandle, Boolean> 
keyedStateApprover =
+                               (ref, alt) -> 
ref.getKeyGroupRange().equals(alt.getKeyGroupRange());
+
+                       // State meta data should match.
+                       BiFunction<OperatorStateHandle, OperatorStateHandle, 
Boolean> operatorStateApprover =
+                               (ref, alt) -> 
ref.getStateNameToPartitionOffsets().equals(alt.getStateNameToPartitionOffsets());
+
+                       return new PrioritizedOperatorSubtaskState(
+                               resolvePrioritizedAlternatives(
+                                       jobManagerState.getManagedKeyedState(),
+                                       managedKeyedAlternatives,
+                                       keyedStateApprover),
+                               resolvePrioritizedAlternatives(
+                                       jobManagerState.getRawKeyedState(),
+                                       rawKeyedAlternatives,
+                                       keyedStateApprover),
+                               resolvePrioritizedAlternatives(
+                                       
jobManagerState.getManagedOperatorState(),
+                                       managedOperatorAlternatives,
+                                       operatorStateApprover),
+                               resolvePrioritizedAlternatives(
+                                       jobManagerState.getRawOperatorState(),
+                                       rawOperatorAlternatives,
+                                       operatorStateApprover),
+                               restored);
+               }
+
+               /**
+                * This helper method resolves the dependencies between the 
ground truth of the operator state obtained from the
+                * job manager and potential alternatives for recovery, e.g. 
from a task-local source.
+                */
+               protected <T extends StateObject> 
List<StateObjectCollection<T>> resolvePrioritizedAlternatives(
+                       StateObjectCollection<T> jobManagerState,
+                       List<StateObjectCollection<T>> alternativesByPriority,
+                       BiFunction<T, T, Boolean> approveFun) {
+
+                       // Nothing to resolve if there are no alternatives, or 
the ground truth has already no state, or if we can
+                       // assume that a rescaling happened because we find 
more than one handle in the JM state (this is more a sanity
+                       // check).
+                       if (alternativesByPriority == null
+                               || alternativesByPriority.isEmpty()
+                               || !jobManagerState.hasState()
+                               || jobManagerState.size() != 1) {
+
+                               return 
Collections.singletonList(jobManagerState);
+                       }
+
+                       // As we know size is == 1
+                       T reference = jobManagerState.iterator().next();
+
+                       // This will contain the end result, we initialize it 
with the potential max. size.
+                       List<StateObjectCollection<T>> approved =
+                               new ArrayList<>(1 + 
alternativesByPriority.size());
+
+                       for (StateObjectCollection<T> alternative : 
alternativesByPriority) {
+
+                               // We found an alternative to the JM state if 
it has state, we have a 1:1 relationship, and the
+                               // approve-function signaled true.
+                               if (alternative != null
+                                       && alternative.hasState()
+                                       && alternative.size() == 1
+                                       && approveFun.apply(reference, 
alternative.iterator().next())) {
+
+                                       approved.add(alternative);
+                               }
+                       }
+
+                       // Of course we include the ground truth as last 
alternative.
+                       approved.add(jobManagerState);
+                       return Collections.unmodifiableList(approved);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
index e09b677..e6fa687 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
@@ -80,7 +81,10 @@ public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepart
                                new EnumMap<>(OperatorStateHandle.Mode.class);
 
                for (OperatorStateHandle.Mode mode : 
OperatorStateHandle.Mode.values()) {
-                       nameToStateByMode.put(mode, new HashMap<>());
+
+                       nameToStateByMode.put(
+                                       mode,
+                                       new HashMap<>());
                }
 
                for (OperatorStateHandle psh : previousParallelSubtaskStates) {
@@ -171,8 +175,6 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                }
 
                                // Now start collection the partitions for the 
parallel instance into this list
-                               List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>> parallelOperatorState =
-                                               new ArrayList<>();
 
                                while (numberOfPartitionsToAssign > 0) {
                                        Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo> handleWithOffsets =
@@ -194,10 +196,6 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                                ++lstIdx;
                                        }
 
-                                       parallelOperatorState.add(new Tuple2<>(
-                                                       handleWithOffsets.f0,
-                                                       new 
OperatorStateHandle.StateMetaInfo(offs, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)));
-
                                        numberOfPartitionsToAssign -= remaining;
 
                                        // As a last step we merge partitions 
that use the same StreamStateHandle in a single
@@ -205,7 +203,7 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                        Map<StreamStateHandle, 
OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx);
                                        OperatorStateHandle operatorStateHandle 
= mergeMap.get(handleWithOffsets.f0);
                                        if (operatorStateHandle == null) {
-                                               operatorStateHandle = new 
OperatorStateHandle(new HashMap<>(), handleWithOffsets.f0);
+                                               operatorStateHandle = new 
OperatorStreamStateHandle(new HashMap<>(), handleWithOffsets.f0);
                                                
mergeMap.put(handleWithOffsets.f0, operatorStateHandle);
                                        }
                                        
operatorStateHandle.getStateNameToPartitionOffsets().put(
@@ -231,7 +229,7 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                for (Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : e.getValue()) {
                                        OperatorStateHandle operatorStateHandle 
= mergeMap.get(handleWithMetaInfo.f0);
                                        if (operatorStateHandle == null) {
-                                               operatorStateHandle = new 
OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
+                                               operatorStateHandle = new 
OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
                                                
mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
                                        }
                                        
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), 
handleWithMetaInfo.f1);
@@ -252,13 +250,13 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                                        uniformBroadcastNameToState.entrySet()) 
{
 
                                int oldParallelism = e.getValue().size();
-                               
+
                                Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo> handleWithMetaInfo =
                                                        e.getValue().get(i % 
oldParallelism);
 
                                OperatorStateHandle operatorStateHandle = 
mergeMap.get(handleWithMetaInfo.f0);
                                if (operatorStateHandle == null) {
-                                       operatorStateHandle = new 
OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
+                                       operatorStateHandle = new 
OperatorStreamStateHandle(new HashMap<>(), handleWithMetaInfo.f0);
                                        mergeMap.put(handleWithMetaInfo.f0, 
operatorStateHandle);
                                }
                                
operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), 
handleWithMetaInfo.f1);
@@ -282,4 +280,4 @@ public class RoundRobinOperatorStateRepartitioner 
implements OperatorStateRepart
                        return byMode.get(mode);
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 43a4b01..592489f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -24,11 +24,11 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -242,10 +242,10 @@ public class StateAssignmentOperation {
                        checkState(!subRawKeyedState.containsKey(instanceID));
                }
                return new OperatorSubtaskState(
-                       subManagedOperatorState.getOrDefault(instanceID, 
Collections.emptyList()),
-                       subRawOperatorState.getOrDefault(instanceID, 
Collections.emptyList()),
-                       subManagedKeyedState.getOrDefault(instanceID, 
Collections.emptyList()),
-                       subRawKeyedState.getOrDefault(instanceID, 
Collections.emptyList()));
+                       new 
StateObjectCollection<>(subManagedOperatorState.getOrDefault(instanceID, 
Collections.emptyList())),
+                       new 
StateObjectCollection<>(subRawOperatorState.getOrDefault(instanceID, 
Collections.emptyList())),
+                       new 
StateObjectCollection<>(subManagedKeyedState.getOrDefault(instanceID, 
Collections.emptyList())),
+                       new 
StateObjectCollection<>(subRawKeyedState.getOrDefault(instanceID, 
Collections.emptyList())));
        }
 
        private static boolean isHeadOperator(int opIdx, List<OperatorID> 
operatorIDs) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
new file mode 100644
index 0000000..38e3d15
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Predicate;
+
+/**
+ * This class represents a generic collection for {@link StateObject}s. Being 
a state object itself, it delegates
+ * {@link #discardState()} to all contained state objects and computes {@link 
#getStateSize()} as sum of the state
+ * sizes of all contained objects.
+ *
+ * @param <T> type of the contained state objects.
+ */
+public class StateObjectCollection<T extends StateObject> implements 
Collection<T>, StateObject {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The empty StateObjectCollection. */
+       private static final StateObjectCollection<?> EMPTY = new 
StateObjectCollection<>(Collections.emptyList());
+
+       /** Wrapped collection that contains the state objects. */
+       private final Collection<T> stateObjects;
+
+       /**
+        * Creates a new StateObjectCollection that is backed by an {@link 
ArrayList}.
+        */
+       public StateObjectCollection() {
+               this.stateObjects = new ArrayList<>();
+       }
+
+       /**
+        * Creates a new StateObjectCollection wraps the given collection and 
delegates to it.
+        * @param stateObjects collection of state objects to wrap.
+        */
+       public StateObjectCollection(Collection<T> stateObjects) {
+               this.stateObjects = stateObjects != null ? stateObjects : 
Collections.emptyList();
+       }
+
+       @Override
+       public int size() {
+               return stateObjects.size();
+       }
+
+       @Override
+       public boolean isEmpty() {
+               return stateObjects.isEmpty();
+       }
+
+       @Override
+       public boolean contains(Object o) {
+               return stateObjects.contains(o);
+       }
+
+       @Override
+       public Iterator<T> iterator() {
+               return stateObjects.iterator();
+       }
+
+       @Override
+       public Object[] toArray() {
+               return stateObjects.toArray();
+       }
+
+       @Override
+       public <T1> T1[] toArray(T1[] a) {
+               return stateObjects.toArray(a);
+       }
+
+       @Override
+       public boolean add(T t) {
+               return stateObjects.add(t);
+       }
+
+       @Override
+       public boolean remove(Object o) {
+               return stateObjects.remove(o);
+       }
+
+       @Override
+       public boolean containsAll(Collection<?> c) {
+               return stateObjects.containsAll(c);
+       }
+
+       @Override
+       public boolean addAll(Collection<? extends T> c) {
+               return stateObjects.addAll(c);
+       }
+
+       @Override
+       public boolean removeAll(Collection<?> c) {
+               return stateObjects.removeAll(c);
+       }
+
+       @Override
+       public boolean removeIf(Predicate<? super T> filter) {
+               return stateObjects.removeIf(filter);
+       }
+
+       @Override
+       public boolean retainAll(Collection<?> c) {
+               return stateObjects.retainAll(c);
+       }
+
+       @Override
+       public void clear() {
+               stateObjects.clear();
+       }
+
+       @Override
+       public void discardState() throws Exception {
+               StateUtil.bestEffortDiscardAllStateObjects(stateObjects);
+       }
+
+       @Override
+       public long getStateSize() {
+               return sumAllSizes(stateObjects);
+       }
+
+       /**
+        * Returns true if this contains at least one {@link StateObject}.
+        */
+       public boolean hasState() {
+               for (StateObject state : stateObjects) {
+                       if (state != null) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               StateObjectCollection<?> that = (StateObjectCollection<?>) o;
+
+               // simple equals can cause troubles here because of how equals 
works e.g. between lists and sets.
+               return CollectionUtils.isEqualCollection(stateObjects, 
that.stateObjects);
+       }
+
+       @Override
+       public int hashCode() {
+               return stateObjects.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return "StateObjectCollection{" + stateObjects + '}';
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Helper methods.
+       // 
------------------------------------------------------------------------
+
+       public static <T extends StateObject> StateObjectCollection<T> empty() {
+               return (StateObjectCollection<T>) EMPTY;
+       }
+
+       public static <T extends StateObject> StateObjectCollection<T> 
singleton(T stateObject) {
+               return new 
StateObjectCollection<>(Collections.singleton(stateObject));
+       }
+
+       private static long sumAllSizes(Collection<? extends StateObject> 
stateObject) {
+               long size = 0L;
+               for (StateObject object : stateObject) {
+                       size += getSizeNullSafe(object);
+               }
+
+               return size;
+       }
+
+       private static long getSizeNullSafe(StateObject stateObject) {
+               return stateObject != null ? stateObject.getStateSize() : 0L;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 281693b..5aab33a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CompositeStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
index 28edc63..abc96a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -67,6 +70,7 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
        /**
         * Returns the subtask state for the given operator id (or null if not 
contained).
         */
+       @Nullable
        public OperatorSubtaskState getSubtaskStateByOperatorID(OperatorID 
operatorID) {
                return subtaskStatesByOperatorID.get(operatorID);
        }
@@ -75,7 +79,10 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
         * Maps the given operator id to the given subtask state. Returns the 
subtask state of a previous mapping, if such
         * a mapping existed or null otherwise.
         */
-       public OperatorSubtaskState putSubtaskStateByOperatorID(OperatorID 
operatorID, OperatorSubtaskState state) {
+       public OperatorSubtaskState putSubtaskStateByOperatorID(
+               @Nonnull OperatorID operatorID,
+               @Nonnull OperatorSubtaskState state) {
+
                return subtaskStatesByOperatorID.put(operatorID, 
Preconditions.checkNotNull(state));
        }
 
@@ -86,6 +93,18 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
                return subtaskStatesByOperatorID.entrySet();
        }
 
+       /**
+        * Returns true if at least one {@link OperatorSubtaskState} in 
subtaskStatesByOperatorID has state.
+        */
+       public boolean hasState() {
+               for (OperatorSubtaskState operatorSubtaskState : 
subtaskStatesByOperatorID.values()) {
+                       if (operatorSubtaskState != null && 
operatorSubtaskState.hasState()) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
        @Override
        public void discardState() throws Exception {
                
StateUtil.bestEffortDiscardAllStateObjects(subtaskStatesByOperatorID.values());

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index d7966e6..468b12f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.util.Disposable;
 
 import java.util.Collection;
 
@@ -36,7 +37,7 @@ import java.util.Collection;
  *
  * <p>Savepoints are serialized via a {@link SavepointSerializer}.
  */
-public interface Savepoint extends Versioned {
+public interface Savepoint extends Disposable, Versioned {
 
        /**
         * Returns the checkpoint ID of the savepoint.
@@ -74,9 +75,4 @@ public interface Savepoint extends Versioned {
         */
        Collection<OperatorState> getOperatorStates();
 
-       /**
-        * Disposes the savepoint.
-        */
-       void dispose() throws Exception;
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index c26c983..b3e6e89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -25,11 +25,12 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -256,7 +257,7 @@ public class SavepointV1Serializer implements 
SavepointSerializer<SavepointV2> {
 
        @VisibleForTesting
        public static void serializeOperatorStateHandle(
-                       OperatorStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
+               OperatorStateHandle stateHandle, DataOutputStream dos) throws 
IOException {
 
                if (stateHandle != null) {
                        dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
@@ -309,7 +310,7 @@ public class SavepointV1Serializer implements 
SavepointSerializer<SavepointV2> {
                                offsetsMap.put(key, metaInfo);
                        }
                        StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
-                       return new OperatorStateHandle(offsetsMap, stateHandle);
+                       return new OperatorStreamStateHandle(offsetsMap, 
stateHandle);
                } else {
                        throw new IllegalStateException("Reading invalid 
OperatorStateHandle, type: " + type);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 9e406df..bd3bfae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -27,8 +27,8 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 5636a52..faee588 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -23,12 +23,13 @@ import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -433,7 +434,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
        }
 
        private static void serializeOperatorStateHandle(
-                       OperatorStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
+               OperatorStateHandle stateHandle, DataOutputStream dos) throws 
IOException {
 
                if (stateHandle != null) {
                        dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
@@ -485,7 +486,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                                offsetsMap.put(key, metaInfo);
                        }
                        StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
-                       return new OperatorStateHandle(offsetsMap, stateHandle);
+                       return new OperatorStreamStateHandle(offsetsMap, 
stateHandle);
                } else {
                        throw new IllegalStateException("Reading invalid 
OperatorStateHandle, type: " + type);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a504799..946f6e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -1240,7 +1240,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         */
        @VisibleForTesting
        public CompletableFuture<Collection<TaskManagerLocation>> 
calculatePreferredLocations(LocationPreferenceConstraint 
locationPreferenceConstraint) {
-               final Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+               final Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures = getVertex().getPreferredLocations();
                final CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture;
 
                switch(locationPreferenceConstraint) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
index a47041b..6c3577d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
+import org.apache.flink.util.Disposable;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.IOException;
+
 /**
  *
  */
-public interface InMemorySorter<T> extends IndexedSortable {
+public interface InMemorySorter<T> extends IndexedSortable, Disposable {
        
        /**
         * Resets the sort buffer back to the state where it is empty. All 
contained data is discarded.
@@ -39,11 +40,12 @@ public interface InMemorySorter<T> extends IndexedSortable {
         * @return True, if no record is contained, false otherwise.
         */
        boolean isEmpty();
-       
+
        /**
         * Disposes the sorter.
         * This method does not release the memory segments used by the sorter.
         */
+       @Override
        void dispose();
        
        /**

Reply via email to