[hotfix] Remove outdated class OperatorStateHandles and replace it with 
OperatorSubtaskState


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/617e67c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/617e67c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/617e67c2

Branch: refs/heads/master
Commit: 617e67c2f540b62b97ca5198b6a5c42b89b6f392
Parents: 7b5c53f
Author: Stefan Richter <[email protected]>
Authored: Fri Feb 23 10:36:09 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:10:28 2018 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkITCase.java        |  8 +-
 .../bucketing/BucketingSinkMigrationTest.java   |  4 +-
 .../fs/bucketing/BucketingSinkTest.java         | 10 +--
 .../kafka/FlinkKafkaProducer011ITCase.java      | 21 +++--
 .../FlinkKafkaConsumerBaseMigrationTest.java    |  4 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  6 +-
 .../FlinkKinesisConsumerMigrationTest.java      |  4 +-
 .../connectors/rabbitmq/RMQSourceTest.java      |  4 +-
 .../org/apache/flink/util/FileUtilsTest.java    |  2 +
 .../ContinuousFileProcessingMigrationTest.java  |  6 +-
 .../hdfstests/ContinuousFileProcessingTest.java |  6 +-
 .../flink/cep/operator/CEPMigrationTest.java    | 14 ++--
 .../flink/cep/operator/CEPOperatorTest.java     | 26 +++----
 .../flink/cep/operator/CEPRescalingTest.java    |  6 +-
 .../runtime/tasks/OperatorStateHandles.java     | 81 --------------------
 .../api/checkpoint/ListCheckpointedTest.java    |  8 +-
 .../api/functions/FromElementsFunctionTest.java |  4 +-
 .../functions/StatefulSequenceSourceTest.java   |  4 +-
 .../sink/TwoPhaseCommitSinkFunctionTest.java    |  8 +-
 .../operators/AbstractStreamOperatorTest.java   |  8 +-
 .../api/operators/KeyedProcessOperatorTest.java |  4 +-
 .../StreamOperatorSnapshotRestoreTest.java      |  4 +-
 .../WrappingFunctionSnapshotRestoreTest.java    |  6 +-
 .../operators/async/AsyncWaitOperatorTest.java  |  4 +-
 .../co/CoBroadcastWithKeyedOperatorTest.java    |  8 +-
 .../co/CoBroadcastWithNonKeyedOperatorTest.java | 10 +--
 .../co/KeyedCoProcessOperatorTest.java          |  4 +-
 .../ContinuousFileProcessingRescalingTest.java  |  6 +-
 .../operators/WriteAheadSinkTestBase.java       | 12 +--
 .../windowing/WindowOperatorContractTest.java   |  4 +-
 .../windowing/WindowOperatorMigrationTest.java  | 16 ++--
 .../operators/windowing/WindowOperatorTest.java | 22 +++---
 .../util/AbstractStreamOperatorTestHarness.java | 35 ++++-----
 .../streaming/util/OperatorSnapshotUtil.java    | 36 ++++-----
 .../PojoSerializerUpgradeTest.java              |  8 +-
 35 files changed, 164 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index e1124e4..1875792 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -23,13 +23,13 @@ import 
org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.test.util.MiniClusterResource;
@@ -673,7 +673,7 @@ public class RollingSinkITCase extends TestLogger {
                testHarness.notifyOfCompletedCheckpoint(0);
                checkFs(outDir, 1, 0, 2, 0);
 
-               OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(1, 0);
 
                testHarness.close();
                checkFs(outDir, 0, 1, 2, 0);
@@ -735,7 +735,7 @@ public class RollingSinkITCase extends TestLogger {
                checkFs(outDir, 3, 5, 0, 0);
 
                // intentionally we snapshot them in a not ascending order so 
that the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+               OperatorSubtaskState mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
                        testHarness3.snapshot(0, 0),
                        testHarness1.snapshot(0, 0),
                        testHarness2.snapshot(0, 0)
@@ -786,7 +786,7 @@ public class RollingSinkITCase extends TestLogger {
                checkFs(outDir, 2, 3, 0, 0);
 
                // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+               OperatorSubtaskState mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
                        testHarness2.snapshot(0, 0),
                        testHarness1.snapshot(0, 0)
                );

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 3b7c7d4..39e8d7d 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.streaming.connectors.fs.bucketing;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.migration.MigrationTestUtil;
@@ -135,7 +135,7 @@ public class BucketingSinkMigrationTest {
 
                checkFs(outDir, 1, 4, 0, 0);
 
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                OperatorSnapshotUtil.writeStateHandle(snapshot, 
"src/test/resources/bucketing-sink-migration-test-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
                testHarness.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 6fa0713..39af139 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
 import org.apache.flink.streaming.connectors.fs.Clock;
@@ -29,7 +30,6 @@ import 
org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.NetUtils;
@@ -237,7 +237,7 @@ public class BucketingSinkTest extends TestLogger {
                testHarness.notifyOfCompletedCheckpoint(0);
                checkFs(outDir, 1, 0, 2, 0);
 
-               OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(1, 0);
 
                testHarness.close();
                checkFs(outDir, 0, 1, 2, 0);
@@ -287,7 +287,7 @@ public class BucketingSinkTest extends TestLogger {
                checkFs(outDir, 2, 0, 0, 0);
 
                // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+               OperatorSubtaskState mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
                        testHarness2.snapshot(0, 0),
                        testHarness1.snapshot(0, 0)
                );
@@ -348,7 +348,7 @@ public class BucketingSinkTest extends TestLogger {
                checkFs(outDir, 4, 0, 0, 0);
 
                // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+               OperatorSubtaskState mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
                        testHarness3.snapshot(0, 0),
                        testHarness1.snapshot(0, 0),
                        testHarness2.snapshot(0, 0)
@@ -393,7 +393,7 @@ public class BucketingSinkTest extends TestLogger {
                checkFs(outDir, 5, 0, 0, 0);
 
                // intentionally we snapshot them in the reverse order so that 
the states are shuffled
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+               OperatorSubtaskState mergedSnapshot = 
AbstractStreamOperatorTestHarness.repackageState(
                        testHarness2.snapshot(0, 0),
                        testHarness1.snapshot(0, 0)
                );

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index ab3a2e2..e76cf13 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -87,7 +87,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        testHarness1.setup();
                        testHarness1.open();
                        testHarness1.processElement(42, 0);
-                       OperatorStateHandles snapshot = 
testHarness1.snapshot(0, 0);
+                       OperatorSubtaskState snapshot = 
testHarness1.snapshot(0, 0);
                        testHarness1.processElement(43, 0);
                        testHarness1.notifyOfCompletedCheckpoint(0);
                        try {
@@ -134,7 +134,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                testHarness.processElement(42, 0);
                testHarness.snapshot(0, 1);
                testHarness.processElement(43, 2);
-               OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
+               OperatorSubtaskState snapshot = testHarness.snapshot(1, 3);
 
                int leaderId = kafkaServer.getLeaderToShutDown(topic);
                failBroker(leaderId);
@@ -188,7 +188,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                testHarness1.snapshot(0, 1);
                testHarness1.processElement(43, 2);
                int transactionCoordinatorId = 
kafkaProducer.getTransactionCoordinatorId();
-               OperatorStateHandles snapshot = testHarness1.snapshot(1, 3);
+               OperatorSubtaskState snapshot = testHarness1.snapshot(1, 3);
 
                failBroker(transactionCoordinatorId);
 
@@ -231,7 +231,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                testHarness.processElement(42, 0);
                testHarness.snapshot(0, 1);
                testHarness.processElement(43, 2);
-               OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3);
+               OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
 
                testHarness.processElement(44, 4);
                testHarness.snapshot(2, 5);
@@ -266,7 +266,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
        public void testFailAndRecoverSameCheckpointTwice() throws Exception {
                String topic = 
"flink-kafka-producer-fail-and-recover-same-checkpoint-twice";
 
-               OperatorStateHandles snapshot1;
+               OperatorSubtaskState snapshot1;
                try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic)) {
                        testHarness.setup();
                        testHarness.open();
@@ -447,18 +447,17 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
 
                        testHarness.setup();
 
-                       testHarness.initializeState(new OperatorStateHandles(
-                               0,
+                       testHarness.initializeState(new OperatorSubtaskState(
+                               inputStates,
                                Collections.emptyList(),
                                Collections.emptyList(),
-                               inputStates,
                                Collections.emptyList()));
                        testHarness.open();
 
                        if (inputData.hasNext()) {
                                int nextValue = inputData.next();
                                testHarness.processElement(nextValue, 0);
-                               OperatorStateHandles snapshot = 
testHarness.snapshot(0, 0);
+                               OperatorSubtaskState snapshot = 
testHarness.snapshot(0, 0);
 
                                
outputStates.addAll(snapshot.getManagedOperatorState());
                                checkState(snapshot.getRawOperatorState() == 
null, "Unexpected raw operator state");
@@ -488,7 +487,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                testHarness.setup();
                testHarness.open(); // producerA - start transaction (txn) 0
                testHarness.processElement(42, 0); // producerA - write 42 in 
txn 0
-               OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); 
// producerA - pre commit txn 0, producerB - start txn 1
+               OperatorSubtaskState checkpoint0 = testHarness.snapshot(0, 1); 
// producerA - pre commit txn 0, producerB - start txn 1
                testHarness.processElement(43, 2); // producerB - write 43 in 
txn 1
                testHarness.notifyOfCompletedCheckpoint(0); // producerA - 
commit txn 0 and return to the pool
                testHarness.snapshot(1, 3); // producerB - pre txn 1,  
producerA - start txn 2

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index 94097e9..d9a9e41 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -33,7 +34,6 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDi
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.migration.MigrationTestUtil;
@@ -169,7 +169,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
                        latch.await();
                }
 
-               final OperatorStateHandles snapshot;
+               final OperatorSubtaskState snapshot;
                synchronized (testHarness.getCheckpointLock()) {
                        snapshot = testHarness.snapshot(0L, 0L);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 403e627..263ed8e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -51,7 +52,6 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer;
 import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -538,13 +538,13 @@ public class FlinkKafkaConsumerBaseTest {
                assertThat(globalSubscribedPartitions.values(), 
hasSize(numPartitions));
                assertThat(mockFetchedPartitionsOnStartup, 
everyItem(isIn(globalSubscribedPartitions.keySet())));
 
-               OperatorStateHandles[] state = new 
OperatorStateHandles[initialParallelism];
+               OperatorSubtaskState[] state = new 
OperatorSubtaskState[initialParallelism];
 
                for (int i = 0; i < initialParallelism; i++) {
                        state[i] = testHarnesses[i].snapshot(0, 0);
                }
 
-               OperatorStateHandles mergedState = 
AbstractStreamOperatorTestHarness.repackageState(state);
+               OperatorSubtaskState mergedState = 
AbstractStreamOperatorTestHarness.repackageState(state);
 
                // 
-----------------------------------------------------------------------------------------
                // restore

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index 75f584e..a99e845 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
@@ -35,7 +36,6 @@ import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestRuntimeContext;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.migration.MigrationTestUtil;
@@ -367,7 +367,7 @@ public class FlinkKinesisConsumerMigrationTest {
 
                fetcher.waitUntilRun();
 
-               final OperatorStateHandles snapshot;
+               final OperatorSubtaskState snapshot;
                synchronized (testHarness.getCheckpointLock()) {
                        snapshot = testHarness.snapshot(0L, 0L);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index bbf893f..01fea29 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -25,13 +25,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 
 import com.rabbitmq.client.AMQP;
@@ -159,7 +159,7 @@ public class RMQSourceTest {
 
                for (int i = 0; i < numSnapshots; i++) {
                        long snapshotId = random.nextLong();
-                       OperatorStateHandles data;
+                       OperatorSubtaskState data;
 
                        synchronized (DummySourceContext.lock) {
                                data = testHarness.snapshot(snapshotId, 
System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 67f831a..4cd4814 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CheckedThread;
 
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -143,6 +144,7 @@ public class FileUtilsTest {
                }
        }
 
+       @Ignore
        @Test
        public void testDeleteDirectoryConcurrently() throws Exception {
                final File parent = tmp.newFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 30a6119..d29ded5 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
@@ -36,7 +37,6 @@ import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -146,7 +146,7 @@ public class ContinuousFileProcessingMigrationTest {
                // to initialize another reader and compare the results of the
                // two operators.
 
-               final OperatorStateHandles snapshot;
+               final OperatorSubtaskState snapshot;
                synchronized (testHarness.getCheckpointLock()) {
                        snapshot = testHarness.snapshot(0L, 0L);
                }
@@ -270,7 +270,7 @@ public class ContinuousFileProcessingMigrationTest {
                        latch.await();
                }
 
-               final OperatorStateHandles snapshot;
+               final OperatorSubtaskState snapshot;
                synchronized (testHarness.getCheckpointLock()) {
                        snapshot = testHarness.snapshot(0L, 0L);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 5d5a1c3..d880f4f 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
@@ -39,7 +40,6 @@ import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.OperatingSystem;
@@ -438,7 +438,7 @@ public class ContinuousFileProcessingTest {
                // to initialize another reader and compare the results of the
                // two operators.
 
-               final OperatorStateHandles snapshot;
+               final OperatorSubtaskState snapshot;
                synchronized (initTestInstance.getCheckpointLock()) {
                        snapshot = initTestInstance.snapshot(0L, 0L);
                }
@@ -816,7 +816,7 @@ public class ContinuousFileProcessingTest {
                // this means it has processed all the splits and updated its 
state.
                synchronized (sourceContext.getCheckpointLock()) {}
 
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
                monitoringFunction.cancel();
                runner.join();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index d45c611..2fda47f 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -118,7 +118,7 @@ public class CEPMigrationTest {
                        harness.processWatermark(new Watermark(5));
 
                        // do snapshot and save to file
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        OperatorSnapshotUtil.writeStateHandle(snapshot,
                                
"src/test/resources/cep-migration-after-branching-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
                } finally {
@@ -205,7 +205,7 @@ public class CEPMigrationTest {
                        harness.processElement(new 
StreamRecord<Event>(middleEvent3, 23));
 
                        // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(1L, 
1L);
+                       OperatorSubtaskState snapshot = harness.snapshot(1L, 
1L);
                        harness.close();
 
                        harness = new KeyedOneInputStreamOperatorTestHarness<>(
@@ -279,7 +279,7 @@ public class CEPMigrationTest {
                        harness.processWatermark(new Watermark(5));
 
                        // do snapshot and save to file
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        OperatorSnapshotUtil.writeStateHandle(snapshot,
                                
"src/test/resources/cep-migration-starting-new-pattern-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
                } finally {
@@ -381,7 +381,7 @@ public class CEPMigrationTest {
                        harness.processElement(new 
StreamRecord<Event>(middleEvent3, 23));
 
                        // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(1L, 
1L);
+                       OperatorSubtaskState snapshot = harness.snapshot(1L, 
1L);
                        harness.close();
 
                        harness = new KeyedOneInputStreamOperatorTestHarness<>(
@@ -449,7 +449,7 @@ public class CEPMigrationTest {
                        harness.processWatermark(new Watermark(5));
 
                        // do snapshot and save to file
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        OperatorSnapshotUtil.writeStateHandle(snapshot,
                                
"src/test/resources/cep-migration-single-pattern-afterwards-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
                } finally {
@@ -542,7 +542,7 @@ public class CEPMigrationTest {
                        harness.processWatermark(new Watermark(6));
 
                        // do snapshot and save to file
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        OperatorSnapshotUtil.writeStateHandle(snapshot,
                                
"src/test/resources/cep-migration-conditions-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
                } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 4fa6d09..322ee99 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -34,11 +34,11 @@ import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.OutputTag;
@@ -113,7 +113,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
 
                        // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        harness = getCepTestHarness(false);
@@ -136,7 +136,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(endEvent, 
5L));
 
                        // simulate snapshot/restore with empty element queue 
but NFA state
-                       OperatorStateHandles snapshot2 = harness.snapshot(1L, 
1L);
+                       OperatorSubtaskState snapshot2 = harness.snapshot(1L, 
1L);
                        harness.close();
 
                        harness = getCepTestHarness(false);
@@ -182,7 +182,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(new Event(42, 
"foobar", 1.0), 2L));
 
                        // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        harness = getCepTestHarness(false);
@@ -205,7 +205,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processWatermark(new Watermark(2L));
 
                        // simulate snapshot/restore with empty element queue 
but NFA state
-                       OperatorStateHandles snapshot2 = harness.snapshot(1L, 
1L);
+                       OperatorSubtaskState snapshot2 = harness.snapshot(1L, 
1L);
                        harness.close();
 
                        harness = getCepTestHarness(false);
@@ -348,7 +348,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(startEvent, 
1L));
 
                        // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
@@ -359,7 +359,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.open();
 
                        harness.processElement(new StreamRecord<>(new Event(42, 
"d", 1.0), 4L));
-                       OperatorStateHandles snapshot2 = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot2 = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
@@ -409,7 +409,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(startEvent, 
1L));
 
                        // simulate snapshot/restore with some elements in 
internal sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
@@ -423,7 +423,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.open();
 
                        harness.processElement(new StreamRecord<>(new Event(42, 
"d", 1.0), 4L));
-                       OperatorStateHandles snapshot2 = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot2 = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        operator = 
CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory());
@@ -588,7 +588,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(startEvent2, 
4L));
                        harness.processElement(new 
StreamRecord<Event>(middleEvent2, 5L));
 
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperator(false);
@@ -764,7 +764,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(startEvent2, 
3L));
                        harness.processElement(new 
StreamRecord<Event>(middleEvent2, 4L));
 
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperator(true);
@@ -955,7 +955,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new StreamRecord<>(middleEvent1, 
3L));
                        harness.processElement(new StreamRecord<>(startEvent2, 
3L));
 
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true);
@@ -1016,7 +1016,7 @@ public class CEPOperatorTest extends TestLogger {
                        harness.processElement(new 
StreamRecord<Event>(middleEvent2, 5L));
                        harness.processElement(new 
StreamRecord<Event>(middleEvent1, 5L));
 
-                       OperatorStateHandles snapshot = harness.snapshot(0L, 
0L);
+                       OperatorSubtaskState snapshot = harness.snapshot(0L, 
0L);
                        harness.close();
 
                        SelectCepOperator<Event, Integer, Map<String, 
List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index f5236c1..8a73556 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -26,11 +26,11 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -101,7 +101,7 @@ public class CEPRescalingTest {
                                new StreamRecord<Event>(middleEvent2, 4));      
          // valid element
 
                        // take a snapshot with some elements in internal 
sorting queue
-                       OperatorStateHandles snapshot = harness.snapshot(0, 0);
+                       OperatorSubtaskState snapshot = harness.snapshot(0, 0);
                        harness.close();
 
                        // initialize two sub-tasks with the previously 
snapshotted state to simulate scaling up
@@ -273,7 +273,7 @@ public class CEPRescalingTest {
 
                        // we take a snapshot and make it look as a single 
operator
                        // this will be the initial state of all downstream 
tasks.
-                       OperatorStateHandles snapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+                       OperatorSubtaskState snapshot = 
AbstractStreamOperatorTestHarness.repackageState(
                                harness2.snapshot(0, 0),
                                harness1.snapshot(0, 0),
                                harness3.snapshot(0, 0)

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
deleted file mode 100644
index 0b03b79..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.util.CollectionUtil;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This class holds all state handles for one operator.
- */
-@Internal
-@VisibleForTesting
-public class OperatorStateHandles {
-
-       private final int operatorChainIndex;
-
-       private final Collection<KeyedStateHandle> managedKeyedState;
-       private final Collection<KeyedStateHandle> rawKeyedState;
-       private final Collection<OperatorStateHandle> managedOperatorState;
-       private final Collection<OperatorStateHandle> rawOperatorState;
-
-       public OperatorStateHandles(
-                       int operatorChainIndex,
-                       Collection<KeyedStateHandle> managedKeyedState,
-                       Collection<KeyedStateHandle> rawKeyedState,
-                       Collection<OperatorStateHandle> managedOperatorState,
-                       Collection<OperatorStateHandle> rawOperatorState) {
-
-               this.operatorChainIndex = operatorChainIndex;
-               this.managedKeyedState = managedKeyedState;
-               this.rawKeyedState = rawKeyedState;
-               this.managedOperatorState = managedOperatorState;
-               this.rawOperatorState = rawOperatorState;
-       }
-
-       public Collection<KeyedStateHandle> getManagedKeyedState() {
-               return managedKeyedState;
-       }
-
-       public Collection<KeyedStateHandle> getRawKeyedState() {
-               return rawKeyedState;
-       }
-
-       public Collection<OperatorStateHandle> getManagedOperatorState() {
-               return managedOperatorState;
-       }
-
-       public Collection<OperatorStateHandle> getRawOperatorState() {
-               return rawOperatorState;
-       }
-
-       public int getOperatorChainIndex() {
-               return operatorChainIndex;
-       }
-
-       private static <T> T getSafeItemAtIndexOrNull(List<T> list, int idx) {
-               return CollectionUtil.isNullOrEmpty(list) ? null : 
list.get(idx);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index 4d201f4..ff0b0fc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 
 import org.junit.Assert;
@@ -41,7 +41,7 @@ public class ListCheckpointedTest {
                AbstractStreamOperatorTestHarness<Integer> testHarness =
                                new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
                testHarness.open();
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                testHarness.initializeState(snapshot);
                Assert.assertTrue(userFunction.isRestored());
        }
@@ -52,7 +52,7 @@ public class ListCheckpointedTest {
                AbstractStreamOperatorTestHarness<Integer> testHarness =
                                new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
                testHarness.open();
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                testHarness.initializeState(snapshot);
                Assert.assertTrue(userFunction.isRestored());
        }
@@ -63,7 +63,7 @@ public class ListCheckpointedTest {
                AbstractStreamOperatorTestHarness<Integer> testHarness =
                                new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
                testHarness.open();
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                testHarness.initializeState(snapshot);
                Assert.assertTrue(userFunction.isRestored());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index 9268ef7..d92a009 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.ExceptionUtils;
@@ -174,7 +174,7 @@ public class FromElementsFunctionTest {
 
                        // make a checkpoint
                        List<Integer> checkpointData = new 
ArrayList<>(numElements);
-                       OperatorStateHandles handles = null;
+                       OperatorSubtaskState handles = null;
                        synchronized (ctx.getCheckpointLock()) {
                                handles = testHarness.snapshot(566, 
System.currentTimeMillis());
                                checkpointData.addAll(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
index de9f1c7..df70651 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 
 import org.junit.Assert;
@@ -112,7 +112,7 @@ public class StatefulSequenceSourceTest {
                        latchToTrigger2.await();
                }
 
-               OperatorStateHandles snapshot = 
AbstractStreamOperatorTestHarness.repackageState(
+               OperatorSubtaskState snapshot = 
AbstractStreamOperatorTestHarness.repackageState(
                        testHarness1.snapshot(0L, 0L),
                        testHarness2.snapshot(0L, 0L)
                );

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 3123675..166dc5a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.ContentDump;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
@@ -159,7 +159,7 @@ public class TwoPhaseCommitSinkFunctionTest {
                harness.processElement("42", 0);
                harness.snapshot(0, 1);
                harness.processElement("43", 2);
-               OperatorStateHandles snapshot = harness.snapshot(1, 3);
+               OperatorSubtaskState snapshot = harness.snapshot(1, 3);
 
                tmpDirectory.setWritable(false);
                try {
@@ -192,7 +192,7 @@ public class TwoPhaseCommitSinkFunctionTest {
                harness.open();
                harness.processElement("42", 0);
 
-               final OperatorStateHandles snapshot = harness.snapshot(0, 1);
+               final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
                harness.notifyOfCompletedCheckpoint(1);
 
                final long transactionTimeout = 1000;
@@ -248,7 +248,7 @@ public class TwoPhaseCommitSinkFunctionTest {
 
                harness.open();
 
-               final OperatorStateHandles snapshot = harness.snapshot(0, 1);
+               final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
                final long elapsedTime = (long) ((double) transactionTimeout * 
warningRatio + 2);
                clock.setEpochMilli(elapsedTime);
                harness.initializeState(snapshot);

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 97a0182..46cae27 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@@ -194,7 +194,7 @@ public class AbstractStreamOperatorTest {
 
                testHarness.processElement(new Tuple2<>(0, 
"SET_PROC_TIME_TIMER:10"), 0);
 
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                TestOperator testOperator1 = new TestOperator();
 
@@ -303,7 +303,7 @@ public class AbstractStreamOperatorTest {
 
                assertTrue(extractResult(testHarness).isEmpty());
 
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                // now, restore in two operators, first operator 1
 
@@ -445,7 +445,7 @@ public class AbstractStreamOperatorTest {
                // take a snapshot from each one of the "parallel" instances of 
the operator
                // and combine them into one so that we can scale down
 
-               OperatorStateHandles repackagedState =
+               OperatorSubtaskState repackagedState =
                        AbstractStreamOperatorTestHarness.repackageState(
                                testHarness1.snapshot(0, 0),
                                testHarness2.snapshot(0, 0)

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index f043fa8..e1986f3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -22,11 +22,11 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -255,7 +255,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(5, 12L));
 
                // snapshot and restore from scratch
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                testHarness.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index c4b6894..9d0b9e2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -40,7 +41,6 @@ import 
org.apache.flink.runtime.state.StatePartitionStreamProvider;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 
@@ -83,7 +83,7 @@ public class StreamOperatorSnapshotRestoreTest {
                        testHarness.processElement(new StreamRecord<>(i));
                }
 
-               OperatorStateHandles handles = testHarness.snapshot(1L, 1L);
+               OperatorSubtaskState handles = testHarness.snapshot(1L, 1L);
 
                testHarness.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
index 6c93894..41bb00c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Assert;
@@ -59,7 +59,7 @@ public class WrappingFunctionSnapshotRestoreTest {
                testHarness.processElement(new StreamRecord<>(5, 12L));
 
                // snapshot and restore from scratch
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                testHarness.close();
 
@@ -91,7 +91,7 @@ public class WrappingFunctionSnapshotRestoreTest {
                testHarness.processElement(new StreamRecord<>(5, 12L));
 
                // snapshot and restore from scratch
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                testHarness.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 507ff0b..8e80f44 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -52,7 +53,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -969,7 +969,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
                snapshotHarness.open();
 
-               final OperatorStateHandles snapshot;
+               final OperatorSubtaskState snapshot;
 
                final ArrayList<Integer> expectedOutput = new 
ArrayList<>(capacity + 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index 3fa439f..96607d4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -25,11 +25,11 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -413,7 +413,7 @@ public class CoBroadcastWithKeyedOperatorTest {
                keysToRegister.add("test2");
                keysToRegister.add("test3");
 
-               final OperatorStateHandles mergedSnapshot;
+               final OperatorSubtaskState mergedSnapshot;
 
                try (
                                TwoInputStreamOperatorTestHarness<String, 
Integer, String> testHarness1 = getInitializedTestHarness(
@@ -512,7 +512,7 @@ public class CoBroadcastWithKeyedOperatorTest {
                keysToRegister.add("test2");
                keysToRegister.add("test3");
 
-               final OperatorStateHandles mergedSnapshot;
+               final OperatorSubtaskState mergedSnapshot;
 
                try (
                                TwoInputStreamOperatorTestHarness<String, 
Integer, String> testHarness1 = getInitializedTestHarness(
@@ -710,7 +710,7 @@ public class CoBroadcastWithKeyedOperatorTest {
                        final int maxParallelism,
                        final int numTasks,
                        final int taskIdx,
-                       final OperatorStateHandles initState) throws Exception {
+                       final OperatorSubtaskState initState) throws Exception {
 
                final TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>  
testHarness =
                                new KeyedTwoInputStreamOperatorTestHarness<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
index 96e1c3e..fa72e45 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java
@@ -21,10 +21,10 @@ package org.apache.flink.streaming.api.operators.co;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
@@ -243,7 +243,7 @@ public class CoBroadcastWithNonKeyedOperatorTest {
                keysToRegister.add("test2");
                keysToRegister.add("test3");
 
-               final OperatorStateHandles mergedSnapshot;
+               final OperatorSubtaskState mergedSnapshot;
 
                try (
                                TwoInputStreamOperatorTestHarness<String, 
Integer, String> testHarness1 = getInitializedTestHarness(
@@ -335,7 +335,7 @@ public class CoBroadcastWithNonKeyedOperatorTest {
                keysToRegister.add("test2");
                keysToRegister.add("test3");
 
-               final OperatorStateHandles mergedSnapshot;
+               final OperatorSubtaskState mergedSnapshot;
 
                try (
                                TwoInputStreamOperatorTestHarness<String, 
Integer, String> testHarness1 = getInitializedTestHarness(
@@ -547,7 +547,7 @@ public class CoBroadcastWithNonKeyedOperatorTest {
 //                     final int maxParallelism,
 //                     final int numTasks,
 //                     final int taskIdx,
-//                     final OperatorStateHandles initState) throws Exception {
+//                     final OperatorSubtaskState initState) throws Exception {
 //
 //             return getInitializedTestHarness(function, maxParallelism, 
numTasks, taskIdx, initState, STATE_DESCRIPTOR);
 //     }
@@ -557,7 +557,7 @@ public class CoBroadcastWithNonKeyedOperatorTest {
                        final int maxParallelism,
                        final int numTasks,
                        final int taskIdx,
-                       final OperatorStateHandles initState,
+                       final OperatorSubtaskState initState,
                        final MapStateDescriptor<?, ?>... descriptors) throws 
Exception {
 
                TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = 
new TwoInputStreamOperatorTestHarness<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
index 13c6a19..157053a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
@@ -22,11 +22,11 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
@@ -294,7 +294,7 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
                testHarness.processElement2(new StreamRecord<>("5", 12L));
 
                // snapshot and restore from scratch
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                testHarness.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
index ca49a2a..bf83ab4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
@@ -89,7 +89,7 @@ public class ContinuousFileProcessingRescalingTest {
 
                // 2) and take the snapshots from the previous instances and 
merge them
                // into a new one which will be then used to initialize a third 
instance
-               OperatorStateHandles mergedState = 
AbstractStreamOperatorTestHarness.
+               OperatorSubtaskState mergedState = 
AbstractStreamOperatorTestHarness.
                        repackageState(
                                testHarness2.snapshot(0, 0),
                                testHarness1.snapshot(0, 0)
@@ -157,7 +157,7 @@ public class ContinuousFileProcessingRescalingTest {
                }
 
                // this will be the state shared by the 2 new instances.
-               OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness1.snapshot(0, 0);
 
                // 1) clear the output of instance so that we can compare it 
with one created by the new instances, and
                // 2) let the operator process the rest of its state

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index d3fd585..5fbce36 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
@@ -140,7 +140,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                        elementCounter++;
                }
 
-               OperatorStateHandles latestSnapshot = 
testHarness.snapshot(snapshotCount++, 0);
+               OperatorSubtaskState latestSnapshot = 
testHarness.snapshot(snapshotCount++, 0);
                testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
 
                for (int x = 0; x < 20; x++) {
@@ -195,11 +195,11 @@ public abstract class WriteAheadSinkTestBase<IN, S 
extends GenericWriteAheadSink
                }
 
                // snapshot at checkpoint 0 for testHarness1 and testHarness 2
-               OperatorStateHandles snapshot1 = 
testHarness1.snapshot(snapshotCount, 0);
-               OperatorStateHandles snapshot2 = 
testHarness2.snapshot(snapshotCount, 0);
+               OperatorSubtaskState snapshot1 = 
testHarness1.snapshot(snapshotCount, 0);
+               OperatorSubtaskState snapshot2 = 
testHarness2.snapshot(snapshotCount, 0);
 
                // merge the two partial states
-               OperatorStateHandles mergedSnapshot = 
AbstractStreamOperatorTestHarness
+               OperatorSubtaskState mergedSnapshot = 
AbstractStreamOperatorTestHarness
                        .repackageState(snapshot1, snapshot2);
 
                testHarness1.close();
@@ -255,7 +255,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink
                }
 
                // this will be the state that will be split between the two 
new operators
-               OperatorStateHandles snapshot = 
testHarness1.snapshot(++snapshotCount, 0);
+               OperatorSubtaskState snapshot = 
testHarness1.snapshot(++snapshotCount, 0);
 
                testHarness1.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 30b8da9..30382ed 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -31,7 +32,6 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -2356,7 +2356,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                assertEquals(5, testHarness.numKeyedStateEntries());
                assertEquals(4, testHarness.numEventTimeTimers()); // timers/gc 
timers for two windows
 
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
 
                // restore
                mockAssigner = mockMergingAssigner();

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index ec537c6..e41b9cc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
@@ -49,7 +50,6 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
@@ -142,7 +142,7 @@ public class WindowOperatorMigrationTest {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
 
                // do snapshot and save to file
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                OperatorSnapshotUtil.writeStateHandle(
                        snapshot,
@@ -237,7 +237,7 @@ public class WindowOperatorMigrationTest {
                testHarness.open();
 
                // do snapshot and save to file
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
                OperatorSnapshotUtil.writeStateHandle(
                        snapshot,
                        
"src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink"
 + flinkGenerateSavepointVersion + "-snapshot");
@@ -369,7 +369,7 @@ public class WindowOperatorMigrationTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator<>());
 
                // do snapshot and save to file
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
                OperatorSnapshotUtil.writeStateHandle(
                        snapshot,
                        
"src/test/resources/win-op-migration-test-reduce-event-time-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
@@ -485,7 +485,7 @@ public class WindowOperatorMigrationTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator<>());
 
                // do snapshot and save to file
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
                OperatorSnapshotUtil.writeStateHandle(
                        snapshot,
                        
"src/test/resources/win-op-migration-test-apply-event-time-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
@@ -594,7 +594,7 @@ public class WindowOperatorMigrationTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator<>());
 
                // do snapshot and save to file
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
                OperatorSnapshotUtil.writeStateHandle(
                        snapshot,
                        
"src/test/resources/win-op-migration-test-reduce-processing-time-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
@@ -699,7 +699,7 @@ public class WindowOperatorMigrationTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator<>());
 
                // do snapshot and save to file
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
                OperatorSnapshotUtil.writeStateHandle(
                        snapshot,
                        
"src/test/resources/win-op-migration-test-apply-processing-time-flink" + 
flinkGenerateSavepointVersion + "-snapshot");
@@ -813,7 +813,7 @@ public class WindowOperatorMigrationTest {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator<>());
 
                // do snapshot and save to file
-               OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
                OperatorSnapshotUtil.writeStateHandle(
                        snapshot,
                        
"src/test/resources/win-op-migration-test-kryo-serialized-key-flink" + 
flinkGenerateSavepointVersion + "-snapshot");

Reply via email to