http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index f85b7fb..6926480 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import 
org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
@@ -64,7 +65,6 @@ import 
org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -145,7 +145,7 @@ public class WindowOperatorTest extends TestLogger {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                testHarness.close();
 
                expectedOutput.clear();
@@ -267,7 +267,7 @@ public class WindowOperatorTest extends TestLogger {
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple2ResultSortComparator());
                testHarness.close();
 
@@ -402,7 +402,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
 
@@ -480,7 +480,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
 
@@ -555,7 +555,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                testHarness.close();
 
                testHarness = createTestHarness(operator);
@@ -628,7 +628,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 3), 2500));
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                testHarness.close();
 
                testHarness = createTestHarness(operator);
@@ -709,7 +709,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key1", 2), 1000));
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
                testHarness.close();
 
                expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 
0L, 6500L), 6499));
@@ -791,7 +791,7 @@ public class WindowOperatorTest extends TestLogger {
                expectedOutput.add(new Watermark(3000));
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new 
Tuple3ResultSortComparator());
 
@@ -846,7 +846,7 @@ public class WindowOperatorTest extends TestLogger {
                                null /* late data output tag */);
 
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-               OperatorStateHandles snapshot;
+               OperatorSubtaskState snapshot;
 
                try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple3<String, Long, Long>> testHarness =
                                createTestHarness(operator)) {
@@ -1017,7 +1017,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1999));
 
                // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
+               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
 
                testHarness.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index ced22c0..d38cb28 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -57,7 +57,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -320,7 +319,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         * in the local key-group range and the operator states that would be 
assigned to the local
         * subtask.
         */
-       public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
+       public void initializeState(OperatorSubtaskState operatorStateHandles) 
throws Exception {
                if (!setupCalled) {
                        setup();
                }
@@ -391,12 +390,12 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
        }
 
        /**
-        * Takes the different {@link OperatorStateHandles} created by calling 
{@link #snapshot(long, long)}
+        * Takes the different {@link OperatorSubtaskState} created by calling 
{@link #snapshot(long, long)}
         * on different instances of {@link AbstractStreamOperatorTestHarness} 
(each one representing one subtask)
-        * and repacks them into a single {@link OperatorStateHandles} so that 
the parallelism of the test
+        * and repacks them into a single {@link OperatorSubtaskState} so that 
the parallelism of the test
         * can change arbitrarily (i.e. be able to scale both up and down).
         *
-        * <p>After repacking the partial states, use {@link 
#initializeState(OperatorStateHandles)} to initialize
+        * <p>After repacking the partial states, use {@link 
#initializeState(OperatorSubtaskState)} to initialize
         * a new instance with the resulting state. Bear in mind that for 
parallelism greater than one, you
         * have to use the constructor {@link 
#AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
         *
@@ -409,7 +408,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         * @param handles the different states to be merged.
         * @return the resulting state, or {@code null} if no partial states 
are specified.
         */
-       public static OperatorStateHandles 
repackageState(OperatorStateHandles... handles) throws Exception {
+       public static OperatorSubtaskState 
repackageState(OperatorSubtaskState... handles) throws Exception {
 
                if (handles.length < 1) {
                        return null;
@@ -423,7 +422,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                List<KeyedStateHandle> mergedManagedKeyedState = new 
ArrayList<>(handles.length);
                List<KeyedStateHandle> mergedRawKeyedState = new 
ArrayList<>(handles.length);
 
-               for (OperatorStateHandles handle: handles) {
+               for (OperatorSubtaskState handle: handles) {
 
                        Collection<OperatorStateHandle> managedOperatorState = 
handle.getManagedOperatorState();
                        Collection<OperatorStateHandle> rawOperatorState = 
handle.getRawOperatorState();
@@ -447,12 +446,11 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                        }
                }
 
-               return new OperatorStateHandles(
-                       0,
-                       mergedManagedKeyedState,
-                       mergedRawKeyedState,
+               return new OperatorSubtaskState(
                        mergedManagedOperatorState,
-                       mergedRawOperatorState);
+                       mergedRawOperatorState,
+                       mergedManagedKeyedState,
+                       mergedRawKeyedState);
        }
 
        /**
@@ -470,7 +468,7 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
        /**
         * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)}.
         */
-       public OperatorStateHandles snapshot(long checkpointId, long timestamp) 
throws Exception {
+       public OperatorSubtaskState snapshot(long checkpointId, long timestamp) 
throws Exception {
 
                OperatorSnapshotResult operatorStateResult = 
operator.snapshotState(
                        checkpointId,
@@ -484,12 +482,11 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                OperatorStateHandle opManaged = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture());
                OperatorStateHandle opRaw = 
FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture());
 
-               return new OperatorStateHandles(
-                       0,
-                       keyedManaged != null ? 
Collections.singletonList(keyedManaged) : null,
-                       keyedRaw != null ? Collections.singletonList(keyedRaw) 
: null,
-                       opManaged != null ? 
Collections.singletonList(opManaged) : null,
-                       opRaw != null ? Collections.singletonList(opRaw) : 
null);
+               return new OperatorSubtaskState(
+                       opManaged != null ? 
Collections.singletonList(opManaged) : Collections.emptyList(),
+                       opRaw != null ? Collections.singletonList(opRaw) : 
Collections.emptyList(),
+                       keyedManaged != null ? 
Collections.singletonList(keyedManaged) : Collections.emptyList(),
+                       keyedRaw != null ? Collections.singletonList(keyedRaw) 
: Collections.emptyList());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 33f32e9..8d37266 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -34,7 +34,7 @@ import java.util.Collection;
 import java.util.List;
 
 /**
- * Util for writing/reading {@link 
org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
+ * Util for writing/reading {@link OperatorSubtaskState},
  * for use in tests.
  */
 public class OperatorSnapshotUtil {
@@ -45,12 +45,13 @@ public class OperatorSnapshotUtil {
                return resource.getFile();
        }
 
-       public static void writeStateHandle(OperatorStateHandles state, String 
path) throws IOException {
+       public static void writeStateHandle(OperatorSubtaskState state, String 
path) throws IOException {
                FileOutputStream out = new FileOutputStream(path);
 
                try (DataOutputStream dos = new DataOutputStream(out)) {
 
-                       dos.writeInt(state.getOperatorChainIndex());
+                       // must be here for compatibility
+                       dos.writeInt(0);
 
                        // still required for compatibility
                        SavepointV1Serializer.serializeStreamStateHandle(null, 
dos);
@@ -103,18 +104,19 @@ public class OperatorSnapshotUtil {
                }
        }
 
-       public static OperatorStateHandles readStateHandle(String path) throws 
IOException, ClassNotFoundException {
+       public static OperatorSubtaskState readStateHandle(String path) throws 
IOException, ClassNotFoundException {
                FileInputStream in = new FileInputStream(path);
                try (DataInputStream dis = new DataInputStream(in)) {
-                       int index = dis.readInt();
+
+                       // ignored
+                       dis.readInt();
 
                        // still required for compatibility to consume the 
bytes.
                        SavepointV1Serializer.deserializeStreamStateHandle(dis);
 
-                       List<OperatorStateHandle> rawOperatorState = null;
+                       List<OperatorStateHandle> rawOperatorState = new 
ArrayList<>();
                        int numRawOperatorStates = dis.readInt();
                        if (numRawOperatorStates >= 0) {
-                               rawOperatorState = new ArrayList<>();
                                for (int i = 0; i < numRawOperatorStates; i++) {
                                        OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
                                                dis);
@@ -122,10 +124,9 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       List<OperatorStateHandle> managedOperatorState = null;
+                       List<OperatorStateHandle> managedOperatorState = new 
ArrayList<>();
                        int numManagedOperatorStates = dis.readInt();
                        if (numManagedOperatorStates >= 0) {
-                               managedOperatorState = new ArrayList<>();
                                for (int i = 0; i < numManagedOperatorStates; 
i++) {
                                        OperatorStateHandle operatorState = 
SavepointV1Serializer.deserializeOperatorStateHandle(
                                                dis);
@@ -133,10 +134,9 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       List<KeyedStateHandle> rawKeyedState = null;
+                       List<KeyedStateHandle> rawKeyedState = new 
ArrayList<>();
                        int numRawKeyedStates = dis.readInt();
                        if (numRawKeyedStates >= 0) {
-                               rawKeyedState = new ArrayList<>();
                                for (int i = 0; i < numRawKeyedStates; i++) {
                                        KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
                                                dis);
@@ -144,10 +144,9 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       List<KeyedStateHandle> managedKeyedState = null;
+                       List<KeyedStateHandle> managedKeyedState = new 
ArrayList<>();
                        int numManagedKeyedStates = dis.readInt();
                        if (numManagedKeyedStates >= 0) {
-                               managedKeyedState = new ArrayList<>();
                                for (int i = 0; i < numManagedKeyedStates; i++) 
{
                                        KeyedStateHandle keyedState = 
SavepointV1Serializer.deserializeKeyedStateHandle(
                                                dis);
@@ -155,12 +154,11 @@ public class OperatorSnapshotUtil {
                                }
                        }
 
-                       return new OperatorStateHandles(
-                               index,
-                               managedKeyedState,
-                               rawKeyedState,
+                       return new OperatorSubtaskState(
                                managedOperatorState,
-                               rawOperatorState);
+                               rawOperatorState,
+                               managedKeyedState,
+                               rawKeyedState);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 908666a..aed6663 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -45,7 +46,6 @@ import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.DynamicCodeLoadingException;
@@ -307,7 +307,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                        new URL[]{rootPath.toURI().toURL()},
                        Thread.currentThread().getContextClassLoader());
 
-               OperatorStateHandles stateHandles = runOperator(
+               OperatorSubtaskState stateHandles = runOperator(
                        taskConfiguration,
                        executionConfig,
                        new StreamMap<>(new StatefulMapper(isKeyedState, false, 
hasBField)),
@@ -340,7 +340,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                        inputs);
        }
 
-       private OperatorStateHandles runOperator(
+       private OperatorSubtaskState runOperator(
                        Configuration taskConfiguration,
                        ExecutionConfig executionConfig,
                        OneInputStreamOperator<Long, Long> operator,
@@ -348,7 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                        boolean isKeyedState,
                        StateBackend stateBackend,
                        ClassLoader classLoader,
-                       OperatorStateHandles operatorStateHandles,
+                       OperatorSubtaskState operatorStateHandles,
                        Iterable<Long> input) throws Exception {
 
                try (final MockEnvironment environment = new MockEnvironment(

Reply via email to