[FLINK-4379] [checkpoints] Introduce rescalable operator state

This introduces the Operator State Backend, which stores state that is not 
partitioned
by a key. It replaces the 'Checkpointed' interface.

Additionally, this introduces CheckpointStateHandles as container for all 
checkpoint related state handles

This closes #2512


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53ed6ada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53ed6ada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53ed6ada

Branch: refs/heads/master
Commit: 53ed6adac8cbe6b5dcb692dc9b94970f3ec5887c
Parents: 2afc092
Author: Stefan Richter <[email protected]>
Authored: Wed Aug 31 23:59:27 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Sep 30 12:38:46 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |   6 +-
 .../state/RocksDBKeyedStateBackend.java         |  75 ++--
 .../streaming/state/RocksDBStateBackend.java    |   8 +-
 .../state/RocksDBAsyncSnapshotTest.java         |  12 +-
 .../state/RocksDBStateBackendConfigTest.java    |  48 ++-
 .../api/common/functions/RuntimeContext.java    | 125 +-----
 .../util/AbstractRuntimeUDFContext.java         |  28 +-
 .../flink/api/common/state/OperatorState.java   |  70 ---
 .../flink/api/common/state/ValueState.java      |   2 +-
 .../java/typeutils/runtime/JavaSerializer.java  | 116 +++++
 .../flink/hdfstests/FileStateBackendTest.java   |  26 +-
 .../AbstractCEPBasePatternOperator.java         |   3 +-
 .../operator/AbstractCEPPatternOperator.java    |   2 -
 .../AbstractKeyedCEPPatternOperator.java        |   2 -
 .../checkpoint/CheckpointCoordinator.java       | 127 +++++-
 .../runtime/checkpoint/CompletedCheckpoint.java |   5 -
 .../checkpoint/OperatorStateRepartitioner.java  |  42 ++
 .../runtime/checkpoint/PendingCheckpoint.java   |  95 +++--
 .../RoundRobinOperatorStateRepartitioner.java   | 190 +++++++++
 .../flink/runtime/checkpoint/SubtaskState.java  |   9 -
 .../flink/runtime/checkpoint/TaskState.java     |  79 +++-
 .../savepoint/SavepointV1Serializer.java        |  97 ++++-
 .../deployment/TaskDeploymentDescriptor.java    |  50 ++-
 .../flink/runtime/execution/Environment.java    |  16 +-
 .../flink/runtime/executiongraph/Execution.java |  25 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   2 -
 .../runtime/executiongraph/ExecutionVertex.java |   6 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |  11 +-
 .../checkpoint/AcknowledgeCheckpoint.java       |  67 ++-
 .../runtime/state/AbstractCloseableHandle.java  | 126 ------
 .../state/AbstractKeyedStateBackend.java        | 342 +++++++++++++++
 .../runtime/state/AbstractStateBackend.java     |  43 +-
 .../flink/runtime/state/ChainedStateHandle.java |   7 +-
 .../runtime/state/CheckpointStateHandles.java   | 103 +++++
 .../flink/runtime/state/ClosableRegistry.java   |  84 ++++
 .../state/DefaultOperatorStateBackend.java      | 215 ++++++++++
 .../runtime/state/KeyGroupRangeOffsets.java     |   2 +
 .../runtime/state/KeyGroupsStateHandle.java     |   6 -
 .../flink/runtime/state/KeyedStateBackend.java  | 301 ++-----------
 .../runtime/state/OperatorStateBackend.java     |  35 ++
 .../runtime/state/OperatorStateHandle.java      | 109 +++++
 .../flink/runtime/state/OperatorStateStore.java |  47 +++
 ...artitionableCheckpointStateOutputStream.java |  96 +++++
 .../state/RetrievableStreamStateHandle.java     |   2 +-
 .../flink/runtime/state/SnapshotProvider.java   |  45 ++
 .../apache/flink/runtime/state/StateObject.java |   6 +-
 .../apache/flink/runtime/state/StateUtil.java   |  37 --
 .../state/filesystem/FileStateHandle.java       |   8 +-
 .../state/filesystem/FsStateBackend.java        |   6 +-
 .../state/heap/HeapKeyedStateBackend.java       | 210 ++++-----
 .../state/memory/ByteStreamStateHandle.java     |  13 +-
 .../state/memory/MemoryStateBackend.java        |   9 +-
 .../ActorGatewayCheckpointResponder.java        |  11 +-
 .../taskmanager/CheckpointResponder.java        |  15 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  11 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 421 +++++++++++++++----
 .../checkpoint/CheckpointStateRestoreTest.java  |  46 +-
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/PendingCheckpointTest.java       |   2 +-
 .../checkpoint/PendingSavepointTest.java        |   2 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   5 -
 .../checkpoint/savepoint/SavepointV1Test.java   |  20 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  20 +-
 .../messages/CheckpointMessagesTest.java        |  17 +-
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../runtime/query/QueryableStateClientTest.java |   4 +-
 .../runtime/query/netty/KvStateClientTest.java  |   5 +-
 .../query/netty/KvStateServerHandlerTest.java   |   7 +-
 .../runtime/query/netty/KvStateServerTest.java  |   4 +-
 .../state/AbstractCloseableHandleTest.java      |  97 -----
 .../runtime/state/FileStateBackendTest.java     |  35 +-
 .../runtime/state/MemoryStateBackendTest.java   |  15 +-
 .../runtime/state/OperatorStateBackendTest.java | 155 +++++++
 .../runtime/state/StateBackendTestBase.java     | 115 ++---
 .../FsCheckpointStateOutputStreamTest.java      |  16 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   5 +-
 .../ZooKeeperStateHandleStoreITCase.java        |   4 -
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  37 +-
 .../connectors/kafka/KafkaConsumer08Test.java   |   4 +-
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  43 +-
 .../kafka/FlinkKafkaConsumerBase.java           | 182 +++++---
 .../kafka/FlinkKafkaProducerBase.java           |  27 +-
 .../kafka/internals/AbstractFetcher.java        |   4 +-
 .../kafka/AtLeastOnceProducerTest.java          |  13 +-
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 149 ++++++-
 .../connectors/kafka/KafkaConsumerTestBase.java |  30 +-
 .../kafka/testutils/MockRuntimeContext.java     |  10 -
 .../streaming/api/checkpoint/Checkpointed.java  |   1 +
 .../api/checkpoint/CheckpointedFunction.java    |  65 +++
 .../api/checkpoint/ListCheckpointed.java        |  65 +++
 .../source/ContinuousFileReaderOperator.java    |   5 +-
 .../api/operators/AbstractStreamOperator.java   |  96 ++++-
 .../operators/AbstractUdfStreamOperator.java    |  65 ++-
 .../operators/StreamCheckpointedOperator.java   |  58 +++
 .../streaming/api/operators/StreamOperator.java |  43 +-
 .../api/operators/StreamingRuntimeContext.java  |  32 --
 .../operators/GenericWriteAheadSink.java        |  25 +-
 .../windowing/EvictingWindowOperator.java       |   4 +-
 .../operators/windowing/WindowOperator.java     |  14 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  50 ++-
 .../streaming/runtime/tasks/StreamTask.java     | 314 ++++++++------
 .../operators/StreamingRuntimeContextTest.java  |   8 +-
 .../streaming/runtime/io/BarrierBufferTest.java |   8 +-
 .../runtime/io/BarrierTrackerTest.java          |  13 +-
 .../operators/StreamOperatorChainingTest.java   |  15 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  68 +--
 .../runtime/tasks/OneInputStreamTaskTest.java   |  82 +++-
 .../runtime/tasks/StreamMockEnvironment.java    |   3 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |  37 +-
 .../util/OneInputStreamOperatorTestHarness.java |  24 +-
 .../UdfStreamOperatorCheckpointingITCase.java   |  16 +-
 .../streaming/runtime/StateBackendITCase.java   |  11 +-
 115 files changed, 3981 insertions(+), 1890 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index e878ad5..9da33ef 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -156,7 +156,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
 
        private void writeKey(K key) throws IOException {
                //write key
-               int beforeWrite = (int) keySerializationStream.getPosition();
+               int beforeWrite = keySerializationStream.getPosition();
                backend.getKeySerializer().serialize(key, 
keySerializationDateDataOutputView);
 
                if (ambiguousKeyPossible) {
@@ -166,7 +166,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
        }
 
        private void writeNameSpace(N namespace) throws IOException {
-               int beforeWrite = (int) keySerializationStream.getPosition();
+               int beforeWrite = keySerializationStream.getPosition();
                namespaceSerializer.serialize(namespace, 
keySerializationDateDataOutputView);
 
                if (ambiguousKeyPossible) {
@@ -176,7 +176,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
        }
 
        private void writeLengthFrom(int fromPosition) throws IOException {
-               int length = (int) (keySerializationStream.getPosition() - 
fromPosition);
+               int length = keySerializationStream.getPosition() - 
fromPosition;
                writeVariableIntBytes(length);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d5a96af..126ebd2 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -39,12 +39,12 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.InstantiationUtil;
@@ -73,12 +73,12 @@ import java.util.PriorityQueue;
 import java.util.concurrent.RunnableFuture;
 
 /**
- * A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and 
will serialize state to
+ * A {@link AbstractKeyedStateBackend} that stores its state in {@code 
RocksDB} and will serialize state to
  * streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory} upon
  * checkpointing. This state backend can store very large state that exceeds 
memory and spills
- * to disk.
+ * to disk. Except for the snapshotting, this class should be accessed as if 
it is not threadsafe.
  */
-public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
+public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
 
@@ -98,9 +98,9 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
        private final File instanceRocksDBPath;
 
        /**
-        * Lock for protecting cleanup of the RocksDB db. We acquire this when 
doing asynchronous
-        * checkpoints and when disposing the db. Otherwise, the asynchronous 
snapshot might try
-        * iterating over a disposed db.
+        * Lock for protecting cleanup of the RocksDB against the checkpointing 
thread. We acquire this when doing
+        * asynchronous checkpoints and when disposing the DB. Otherwise, the 
asynchronous snapshot might try
+        * iterating over a disposed DB. After aquriring the lock, always first 
check if (db == null).
         */
        private final SerializableObject dbDisposeLock = new 
SerializableObject();
 
@@ -110,13 +110,13 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
         * instance. They all write to this instance but to their own column 
family.
         */
        @GuardedBy("dbDisposeLock")
-       protected volatile RocksDB db;
+       protected RocksDB db;
 
        /**
         * Information about the k/v states as we create them. This is used to 
retrieve the
         * column family that is used for a state and also for sanity checks 
when restoring.
         */
-       private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> 
kvStateInformation;
+       private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> 
kvStateInformation;
 
        /** Number of bytes required to prefix the key groups. */
        private final int keyGroupPrefixBytes;
@@ -187,8 +187,8 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoreState
        ) throws Exception {
-               this(
-                       jobId,
+
+               this(jobId,
                        operatorIdentifier,
                        userCodeClassLoader,
                        instanceBasePath,
@@ -210,15 +210,11 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
        }
 
        /**
-        * @see java.io.Closeable
-        *
-        * Should only be called by one thread.
-        *
-        * @throws Exception
+        * Should only be called by one thread, and only after all accesses to 
the DB happened.
         */
        @Override
-       public void close() throws Exception {
-               super.close();
+       public void dispose() {
+               super.dispose();
 
                final RocksDB cleanupRockDBReference;
 
@@ -233,13 +229,17 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
 
                // Dispose decoupled db
                if (cleanupRockDBReference != null) {
-                       for (Tuple2<ColumnFamilyHandle, StateDescriptor> column 
: kvStateInformation.values()) {
+                       for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> 
column : kvStateInformation.values()) {
                                column.f0.dispose();
                        }
                        cleanupRockDBReference.dispose();
                }
 
-               FileUtils.deleteDirectory(instanceBasePath);
+               try {
+                       FileUtils.deleteDirectory(instanceBasePath);
+               } catch (IOException ioex) {
+                       LOG.info("Could not delete instace base path for 
RocksDB: " + instanceBasePath);
+               }
        }
 
        public int getKeyGroupPrefixBytes() {
@@ -248,7 +248,7 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
 
        /**
         * Triggers an asynchronous snapshot of the keyed state backend from 
RocksDB. This snapshot can be canceled and
-        * is also stopped when the backend is closed through {@link #close()}. 
For each backend, this method must always
+        * is also stopped when the backend is closed through {@link 
#dispose()}. For each backend, this method must always
         * be called by the same thread.
         *
         * @param checkpointId The Id of the checkpoint.
@@ -386,13 +386,13 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                        Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
                        outStream = checkpointStreamFactory.
                                        
createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+                       
stateBackend.cancelStreamRegistry.registerClosable(outStream);
                        outputView = new DataOutputViewStreamWrapper(outStream);
                }
 
                /**
                 * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
                 *
-                * @return
                 * @throws IOException
                 */
                public void writeDBSnapshot() throws IOException, 
InterruptedException {
@@ -408,7 +408,8 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                 * @throws IOException
                 */
                public void closeCheckpointStream() throws IOException {
-                       if(outStream != null) {
+                       if (outStream != null) {
+                               
stateBackend.cancelStreamRegistry.unregisterClosable(outStream);
                                snapshotResultStateHandle = 
closeSnapshotStreamAndGetHandle();
                        }
                }
@@ -451,7 +452,7 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
 
                        int kvStateId = 0;
                        //iterate all column families, where each column family 
holds one k/v state, to write the metadata
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
StateDescriptor>> column : stateBackend.kvStateInformation.entrySet()) {
+                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
StateDescriptor<?, ?>>> column : stateBackend.kvStateInformation.entrySet()) {
 
                                //be cooperative and check for interruption 
from time to time in the hot loop
                                checkInterrupted();
@@ -463,7 +464,7 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                                ReadOptions readOptions = new ReadOptions();
                                readOptions.setSnapshot(snapshot);
                                RocksIterator iterator = 
stateBackend.db.newIterator(column.getValue().f0, readOptions);
-                               kvStateIterators.add(new Tuple2<RocksIterator, 
Integer>(iterator, kvStateId));
+                               kvStateIterators.add(new Tuple2<>(iterator, 
kvStateId));
                                ++kvStateId;
                        }
                }
@@ -624,15 +625,16 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                                throws IOException, RocksDBException, 
ClassNotFoundException {
                        try {
                                currentStateHandleInStream = 
currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+                               
rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
                                currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
                                restoreKVStateMetaData();
                                restoreKVStateData();
                        } finally {
-                               if(currentStateHandleInStream != null) {
+                               if (currentStateHandleInStream != null) {
+                                       
rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream);
                                        currentStateHandleInStream.close();
                                }
                        }
-
                }
 
                /**
@@ -652,19 +654,20 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                        //restore the empty columns for the k/v states through 
the metadata
                        for (int i = 0; i < numColumns; i++) {
 
-                               StateDescriptor stateDescriptor = 
InstantiationUtil.deserializeObject(
+                               StateDescriptor<?, ?> stateDescriptor = 
(StateDescriptor<?, ?>) InstantiationUtil.deserializeObject(
                                                currentStateHandleInStream,
                                                
rocksDBKeyedStateBackend.userCodeClassLoader);
 
-                               Tuple2<ColumnFamilyHandle, StateDescriptor> 
columnFamily = rocksDBKeyedStateBackend.
+                               Tuple2<ColumnFamilyHandle, StateDescriptor<?, 
?>> columnFamily = rocksDBKeyedStateBackend.
                                                
kvStateInformation.get(stateDescriptor.getName());
 
-                               if(null == columnFamily) {
+                               if (null == columnFamily) {
                                        ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
                                                        
stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
 
-                                       columnFamily = new 
Tuple2<>(rocksDBKeyedStateBackend.db.
-                                                       
createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+                                       columnFamily = new 
Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(
+                                                       
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor), 
stateDescriptor);
+
                                        
rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), 
columnFamily);
                                }
 
@@ -727,9 +730,9 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
         * <p>This also checks whether the {@link StateDescriptor} for a state 
matches the one
         * that we checkpointed, i.e. is already in the map of column families.
         */
-       protected ColumnFamilyHandle getColumnFamily(StateDescriptor 
descriptor) {
+       protected ColumnFamilyHandle getColumnFamily(StateDescriptor<?, ?> 
descriptor) {
 
-               Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = 
kvStateInformation.get(descriptor.getName());
+               Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> stateInfo = 
kvStateInformation.get(descriptor.getName());
 
                if (stateInfo != null) {
                        if (!stateInfo.f1.equals(descriptor)) {
@@ -744,7 +747,9 @@ public class RocksDBKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
 
                try {
                        ColumnFamilyHandle columnFamily = 
db.createColumnFamily(columnDescriptor);
-                       kvStateInformation.put(descriptor.getName(), new 
Tuple2<>(columnFamily, descriptor));
+                       Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> tuple 
=
+                                       new Tuple2<ColumnFamilyHandle, 
StateDescriptor<?, ?>>(columnFamily, descriptor);
+                       kvStateInformation.put(descriptor.getName(), tuple);
                        return columnFamily;
                } catch (RocksDBException e) {
                        throw new RuntimeException("Error creating 
ColumnFamilyHandle.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index b6ce224..a0c980b 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -23,11 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
@@ -224,7 +224,7 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
-       public <K> KeyedStateBackend<K> createKeyedStateBackend(
+       public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                        Environment env,
                        JobID jobID,
                        String operatorIdentifier,
@@ -251,7 +251,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
-       public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment 
env, JobID jobID,
+       public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index c0c9ca1..bccbabc 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -29,10 +29,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
@@ -66,7 +64,6 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutorService;
@@ -138,12 +135,11 @@ public class RocksDBAsyncSnapshotTest {
                        @Override
                        public void acknowledgeCheckpoint(
                                        long checkpointId,
-                                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle, 
-                                       List<KeyGroupsStateHandle> 
keyGroupStateHandles,
+                                       CheckpointStateHandles 
checkpointStateHandles,
                                        long synchronousDurationMillis, long 
asynchronousDurationMillis,
                                        long bytesBufferedInAlignment, long 
alignmentDurationNanos) {
 
-                               super.acknowledgeCheckpoint(checkpointId, 
chainedStateHandle, keyGroupStateHandles,
+                               super.acknowledgeCheckpoint(checkpointId, 
checkpointStateHandles,
                                                synchronousDurationMillis, 
asynchronousDurationMillis,
                                                bytesBufferedInAlignment, 
alignmentDurationNanos);
 
@@ -156,7 +152,7 @@ public class RocksDBAsyncSnapshotTest {
                                }
 
                                // should be only one k/v state
-                               assertEquals(1, keyGroupStateHandles.size());
+                               assertEquals(1, 
checkpointStateHandles.getKeyGroupsStateHandle().size());
 
                                // we now know that the checkpoint went through
                                ensureCheckpointLatch.trigger();

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 3b851be..07fc27c 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
-
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.OperatingSystem;
@@ -34,7 +33,6 @@ import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
@@ -45,8 +43,18 @@ import java.io.File;
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -88,13 +96,15 @@ public class RocksDBStateBackendConfigTest {
                assertArrayEquals(new String[] { testDir1.getAbsolutePath(), 
testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths());
 
                Environment env = getMockEnvironment(new File[] {});
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env,
-                               env.getJobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               1,
-                               new KeyGroupRange(0, 0),
-                               env.getTaskKvStateRegistry());
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+                               createKeyedStateBackend(
+                                               env,
+                                               env.getJobID(),
+                                               "test_op",
+                                               IntSerializer.INSTANCE,
+                                               1,
+                                               new KeyGroupRange(0, 0),
+                                               env.getTaskKvStateRegistry());
 
 
                File instanceBasePath = keyedBackend.getInstanceBasePath();
@@ -142,13 +152,15 @@ public class RocksDBStateBackendConfigTest {
                assertNull(rocksDbBackend.getDbStoragePaths());
 
                Environment env = getMockEnvironment(tempDirs);
-               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env,
-                               env.getJobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               1,
-                               new KeyGroupRange(0, 0),
-                               env.getTaskKvStateRegistry());
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+                               createKeyedStateBackend(
+                                               env,
+                                               env.getJobID(),
+                                               "test_op",
+                                               IntSerializer.INSTANCE,
+                                               1,
+                                               new KeyGroupRange(0, 0),
+                                               env.getTaskKvStateRegistry());
 
 
                File instanceBasePath = keyedBackend.getInstanceBasePath();

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index a9e8da9..ce513cb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -33,14 +29,16 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
 /**
  * A RuntimeContext contains information about the context in which functions 
are executed. Each parallel instance
  * of the function will have a context through which it can access static 
contextual information (such as 
@@ -347,117 +345,4 @@ public interface RuntimeContext {
         */
        @PublicEvolving
        <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> 
stateProperties);
-       
-       /**
-        * Gets the key/value state, which is only accessible if the function 
is executed on
-        * a KeyedStream. Upon calling {@link ValueState#value()}, the 
key/value state will
-        * return the value bound to the key of the element currently processed 
by the function.
-        * Each operator may maintain multiple key/value states, addressed with 
different names.
-        *
-        * <p>Because the scope of each value is the key of the currently 
processed element,
-        * and the elements are distributed by the Flink runtime, the system 
can transparently
-        * scale out and redistribute the state and KeyedStream.
-        *
-        * <p>The following code example shows how to implement a continuous 
counter that counts
-        * how many times elements of a certain key occur, and emits an updated 
count for that
-        * element on each occurrence. 
-        *
-        * <pre>{@code
-        * DataStream<MyType> stream = ...;
-        * KeyedStream<MyType> keyedStream = stream.keyBy("id");     
-        *
-        * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
-        *
-        *     private State<Long> state;
-        *
-        *     public void open(Configuration cfg) {
-        *         state = getRuntimeContext().getKeyValueState(Long.class, 0L);
-        *     }
-        *
-        *     public Tuple2<MyType, Long> map(MyType value) {
-        *         long count = state.value();
-        *         state.update(value + 1);
-        *         return new Tuple2<>(value, count);
-        *     }
-        * });
-        *
-        * }</pre>
-        *
-        * <p>This method attempts to deduce the type information from the 
given type class. If the
-        * full type cannot be determined from the class (for example because 
of generic parameters),
-        * the TypeInformation object must be manually passed via 
-        * {@link #getKeyValueState(String, TypeInformation, Object)}. 
-        * 
-        *
-        * @param name The name of the key/value state.
-        * @param stateType The class of the type that is stored in the state. 
Used to generate
-        *                  serializers for managed memory and checkpointing.
-        * @param defaultState The default state value, returned when the state 
is accessed and
-        *                     no value has yet been set for the key. May be 
null.
-        *
-        * @param <S> The type of the state.
-        *
-        * @return The key/value state access.
-        *
-        * @throws UnsupportedOperationException Thrown, if no key/value state 
is available for the
-        *                                       function (function is not part 
os a KeyedStream).
-        * 
-        * @deprecated Use the more expressive {@link 
#getState(ValueStateDescriptor)} instead.
-        */
-       @Deprecated
-       @PublicEvolving
-       <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, 
S defaultState);
-
-       /**
-        * Gets the key/value state, which is only accessible if the function 
is executed on
-        * a KeyedStream. Upon calling {@link ValueState#value()}, the 
key/value state will
-        * return the value bound to the key of the element currently processed 
by the function.
-        * Each operator may maintain multiple key/value states, addressed with 
different names.
-        * 
-        * <p>Because the scope of each value is the key of the currently 
processed element,
-        * and the elements are distributed by the Flink runtime, the system 
can transparently
-        * scale out and redistribute the state and KeyedStream.
-        * 
-        * <p>The following code example shows how to implement a continuous 
counter that counts
-        * how many times elements of a certain key occur, and emits an updated 
count for that
-        * element on each occurrence. 
-        * 
-        * <pre>{@code
-        * DataStream<MyType> stream = ...;
-        * KeyedStream<MyType> keyedStream = stream.keyBy("id");     
-        * 
-        * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
-        * 
-        *     private State<Long> state;
-        *     
-        *     public void open(Configuration cfg) {
-        *         state = getRuntimeContext().getKeyValueState(Long.class, 0L);
-        *     }
-        *     
-        *     public Tuple2<MyType, Long> map(MyType value) {
-        *         long count = state.value();
-        *         state.update(value + 1);
-        *         return new Tuple2<>(value, count);
-        *     }
-        * });
-        *     
-        * }</pre>
-        * 
-        * @param name The name of the key/value state.
-        * @param stateType The type information for the type that is stored in 
the state.
-        *                  Used to create serializers for managed memory and 
checkpoints.
-        * @param defaultState The default state value, returned when the state 
is accessed and
-        *                     no value has yet been set for the key. May be 
null.
-        * @param <S> The type of the state.
-        *
-        * @return The key/value state access.
-        * 
-        * @throws UnsupportedOperationException Thrown, if no key/value state 
is available for the
-        *                                       function (function is not part 
os a KeyedStream).
-        * 
-        * @deprecated Use the more expressive {@link 
#getState(ValueStateDescriptor)} instead.
-        */
-       @Deprecated
-       @PublicEvolving
-       <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> 
stateType, S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 6645964..4f559bf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.api.common.functions.util;
 
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.Future;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -36,15 +31,18 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Future;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -207,20 +205,4 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }
-
-       @Override
-       @Deprecated
-       @PublicEvolving
-       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
-               throw new UnsupportedOperationException(
-                               "This state is only accessible by functions 
executed on a KeyedStream");
-       }
-
-       @Override
-       @Deprecated
-       @PublicEvolving
-       public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
-               throw new UnsupportedOperationException(
-                               "This state is only accessible by functions 
executed on a KeyedStream");
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
deleted file mode 100644
index ac4ed07..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import java.io.IOException;
-
-/**
- * This state interface abstracts persistent key/value state in streaming 
programs.
- * The state is accessed and modified by user functions, and checkpointed 
consistently
- * by the system as part of the distributed snapshots.
- *
- * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
- * automatically supplied by the system, so the function always sees the value 
mapped to the
- * key of the current element. That way, the system can handle stream and 
state partitioning
- * consistently together.
- *
- * @param <T> Type of the value in the operator state
- * 
- * @deprecated OperatorState has been replaced by {@link ValueState}.
- */
-@Deprecated
-@PublicEvolving
-public interface OperatorState<T> extends State {
-
-       /**
-        * Returns the current value for the state. When the state is not
-        * partitioned the returned value is the same for all inputs in a given
-        * operator instance. If state partitioning is applied, the value 
returned
-        * depends on the current operator input, as the operator maintains an
-        * independent state for each partition.
-        *
-        * @return The operator state value corresponding to the current input.
-        *
-        * @throws IOException Thrown if the system cannot access the state.
-        */
-       T value() throws IOException;
-
-       /**
-        * Updates the operator state accessible by {@link #value()} to the 
given
-        * value. The next time {@link #value()} is called (for the same state
-        * partition) the returned state will represent the updated value. When 
a
-        * partitioned state is updated with null, the state for the current 
key 
-        * will be removed and the default value is returned on the next access.
-        *
-        * @param value
-        *            The new value for the state.
-        *
-        * @throws IOException Thrown if the system cannot access the state.
-        */
-       void update(T value) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index 607cb32..de3250a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -37,7 +37,7 @@ import java.io.IOException;
  * @param <T> Type of the value in the state.
  */
 @PublicEvolving
-public interface ValueState<T> extends State, OperatorState<T> {
+public interface ValueState<T> extends State {
 
        /**
         * Returns the current value for the state. When the state is not

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
new file mode 100644
index 0000000..4ae00d1
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               return this;
+       }
+
+       @Override
+       public T createInstance() {
+               return null;
+       }
+
+       @Override
+       public T copy(T from) {
+
+               try {
+                       return InstantiationUtil.clone(from);
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new RuntimeException("Could not copy instance of 
" + from + '.', e);
+               }
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               return copy(from);
+       }
+
+       @Override
+       public int getLength() {
+               return 0;
+       }
+
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               ObjectOutputStream oos = new ObjectOutputStream(new 
DataOutputViewStream(target));
+               oos.writeObject(record);
+               oos.flush();
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               ObjectInputStream ois = new ObjectInputStream(new 
DataInputViewStream(source));
+
+               try {
+                       @SuppressWarnings("unchecked")
+                       T nfa = (T) ois.readObject();
+                       return nfa;
+               } catch (ClassNotFoundException e) {
+                       throw new RuntimeException("Could not deserialize 
NFA.", e);
+               }
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               int size = source.readInt();
+               target.writeInt(size);
+               target.write(source, size);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj instanceof JavaSerializer && ((JavaSerializer<T>) 
obj).canEqual(this);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof JavaSerializer;
+       }
+
+       @Override
+       public int hashCode() {
+               return getClass().hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index df40998..080485e 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.hdfstests;
 
 import org.apache.commons.io.FileUtils;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FileStatus;
@@ -31,10 +30,8 @@ import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -243,16 +240,21 @@ public class FileStateBackendTest extends 
StateBackendTestBase<FsStateBackend> {
        }
 
        private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {
-               byte[] holder = new byte[data.length];
 
-               int pos = 0;
-               int read;
-               while (pos < holder.length && (read = is.read(holder, pos, 
holder.length - pos)) != -1) {
-                       pos += read;
-               }
+               try {
+                       byte[] holder = new byte[data.length];
+
+                       int pos = 0;
+                       int read;
+                       while (pos < holder.length && (read = is.read(holder, 
pos, holder.length - pos)) != -1) {
+                               pos += read;
+                       }
 
-               assertEquals("not enough data", holder.length, pos);
-               assertEquals("too much data", -1, is.read());
-               assertArrayEquals("wrong data", data, holder);
+                       assertEquals("not enough data", holder.length, pos);
+                       assertEquals("too much data", -1, is.read());
+                       assertArrayEquals("wrong data", data, holder);
+               } finally {
+                       is.close();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
index aad408c..2f21346 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,7 +37,7 @@ import java.util.PriorityQueue;
  */
 public abstract class AbstractCEPBasePatternOperator<IN, OUT>
        extends AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<IN, OUT> {
+       implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
 
        private static final long serialVersionUID = -4166778210774160757L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 64ffa2a..10bb6ff 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -104,7 +104,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
 
        @Override
        public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
-               super.snapshotState(out, checkpointId, timestamp);
                final ObjectOutputStream oos = new ObjectOutputStream(out);
 
                oos.writeObject(nfa);
@@ -119,7 +118,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> 
extends AbstractCEPBas
        @Override
        @SuppressWarnings("unchecked")
        public void restoreState(FSDataInputStream state) throws Exception {
-               super.restoreState(state);
                final ObjectInputStream ois = new ObjectInputStream(state);
 
                nfa = (NFA<IN>)ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 09773a2..07e2662 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -187,7 +187,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
 
        @Override
        public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
-               super.snapshotState(out, checkpointId, timestamp);
 
                DataOutputView ov = new DataOutputViewStreamWrapper(out);
                ov.writeInt(keys.size());
@@ -199,7 +198,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT> extends Abst
 
        @Override
        public void restoreState(FSDataInputStream state) throws Exception {
-               super.restoreState(state);
 
                DataInputView inputView = new DataInputViewStreamWrapper(state);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6a43ddf..4428427 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -34,9 +34,11 @@ import 
org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -45,7 +47,9 @@ import scala.concurrent.Future;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -398,9 +402,9 @@ public class CheckpointCoordinator {
                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                        }
 
-               final PendingCheckpoint checkpoint = props.isSavepoint() ?
-                       new PendingSavepoint(job, checkpointID, timestamp, 
ackTasks, savepointStore) :
-                       new PendingCheckpoint(job, checkpointID, timestamp, 
ackTasks);
+                       final PendingCheckpoint checkpoint = 
props.isSavepoint() ?
+                                       new PendingSavepoint(job, checkpointID, 
timestamp, ackTasks, savepointStore) :
+                                       new PendingCheckpoint(job, 
checkpointID, timestamp, ackTasks);
 
                        // schedule the timer that will clean up the expired 
checkpoints
                        TimerTask canceller = new TimerTask() {
@@ -627,9 +631,8 @@ public class CheckpointCoordinator {
                                isPendingCheckpoint = true;
 
                                if (checkpoint.acknowledgeTask(
-                                       message.getTaskExecutionId(),
-                                       message.getStateHandle(),
-                                       message.getKeyGroupsStateHandle())) {
+                                               message.getTaskExecutionId(),
+                                               
message.getCheckpointStateHandles())) {
                                        if (checkpoint.isFullyAcknowledged()) {
                                                completed = 
checkpoint.finalizeCheckpoint();
 
@@ -640,7 +643,7 @@ public class CheckpointCoordinator {
 
                                                if (LOG.isDebugEnabled()) {
                                                        StringBuilder builder = 
new StringBuilder();
-                                                       for 
(Map.Entry<JobVertexID, TaskState> entry: completed.getTaskStates().entrySet()) 
{
+                                                       for 
(Map.Entry<JobVertexID, TaskState> entry : 
completed.getTaskStates().entrySet()) {
                                                                
builder.append("JobVertexID: ").append(entry.getKey()).append(" 
{").append(entry.getValue()).append("}");
                                                        }
 
@@ -654,8 +657,7 @@ public class CheckpointCoordinator {
 
                                                triggerQueuedRequests();
                                        }
-                               }
-                               else {
+                               } else {
                                        // checkpoint did not accept message
                                        LOG.error("Received duplicate or 
invalid acknowledge message for checkpoint " + checkpointId
                                                        + " , task " + 
message.getTaskExecutionId());
@@ -790,22 +792,80 @@ public class CheckpointCoordinator {
                                        }
 
 
+                                       int oldParallelism = 
taskState.getParallelism();
+                                       int newParallelism = 
executionJobVertex.getParallelism();
+                                       boolean parallelismChanged = 
oldParallelism != newParallelism;
                                        boolean hasNonPartitionedState = 
taskState.hasNonPartitionedState();
 
-                                       if (hasNonPartitionedState && 
taskState.getParallelism() != executionJobVertex.getParallelism()) {
+                                       if (hasNonPartitionedState && 
parallelismChanged) {
                                                throw new 
IllegalStateException("Cannot restore the latest checkpoint because " +
                                                        "the operator " + 
executionJobVertex.getJobVertexId() + " has non-partitioned " +
                                                        "state and its 
parallelism changed. The operator" + executionJobVertex.getJobVertexId() +
-                                                       " has parallelism " + 
executionJobVertex.getParallelism() + " whereas the corresponding" +
-                                                       "state object has a 
parallelism of " + taskState.getParallelism());
+                                                       " has parallelism " + 
newParallelism + " whereas the corresponding" +
+                                                       "state object has a 
parallelism of " + oldParallelism);
                                        }
 
                                        List<KeyGroupRange> keyGroupPartitions 
= createKeyGroupPartitions(
-                                               
executionJobVertex.getMaxParallelism(),
-                                               
executionJobVertex.getParallelism());
+                                                       
executionJobVertex.getMaxParallelism(),
+                                                       newParallelism);
+                                       
+                                       // operator chain index -> list of the 
stored partitionables states from all parallel instances
+                                       @SuppressWarnings("unchecked")
+                                       List<OperatorStateHandle>[] 
chainParallelStates =
+                                                       new 
List[taskState.getChainLength()];
+
+                                       for (int i = 0; i < oldParallelism; 
++i) {
+
+                                               
ChainedStateHandle<OperatorStateHandle> partitionableState =
+                                                               
taskState.getPartitionableState(i);
+
+                                               if (partitionableState != null) 
{
+                                                       for (int j = 0; j < 
partitionableState.getLength(); ++j) {
+                                                               
OperatorStateHandle opParalleState = partitionableState.get(j);
+                                                               if 
(opParalleState != null) {
+                                                                       
List<OperatorStateHandle> opParallelStates =
+                                                                               
        chainParallelStates[j];
+                                                                       if 
(opParallelStates == null) {
+                                                                               
opParallelStates = new ArrayList<>();
+                                                                               
chainParallelStates[j] = opParallelStates;
+                                                                       }
+                                                                       
opParallelStates.add(opParalleState);
+                                                               }
+                                                       }
+                                               }
+                                       }
+
+                                       // operator chain index -> lists with 
collected states (one collection for each parallel subtasks)
+                                       @SuppressWarnings("unchecked")
+                                       List<Collection<OperatorStateHandle>>[] 
redistributedParallelStates =
+                                                       new 
List[taskState.getChainLength()];
+
+                                       //TODO here we can employ different 
redistribution strategies for state, e.g. union state. For now we only offer 
round robin as the default.
+                                       OperatorStateRepartitioner 
repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+
+                                       for (int i = 0; i < 
chainParallelStates.length; ++i) {
+                                               List<OperatorStateHandle> 
chainOpParallelStates = chainParallelStates[i];
+                                               if (chainOpParallelStates != 
null) {
+                                                       //We only redistribute 
if the parallelism of the operator changed from previous executions
+                                                       if (parallelismChanged) 
{
+                                                               
redistributedParallelStates[i] = repartitioner.repartitionState(
+                                                                               
chainOpParallelStates,
+                                                                               
newParallelism);
+                                                       } else {
+                                                               
List<Collection<OperatorStateHandle>> repacking = new 
ArrayList<>(newParallelism);
+                                                               for 
(OperatorStateHandle operatorStateHandle : chainOpParallelStates) {
+                                                                       
repacking.add(Collections.singletonList(operatorStateHandle));
+                                                               }
+                                                               
redistributedParallelStates[i] = repacking;
+                                                       }
+                                               }
+                                       }
 
                                        int counter = 0;
-                                       for (int i = 0; i < 
executionJobVertex.getParallelism(); i++) {
+
+                                       for (int i = 0; i < newParallelism; 
++i) {
+
+                                               // non-partitioned state
                                                
ChainedStateHandle<StreamStateHandle> state = null;
 
                                                if (hasNonPartitionedState) {
@@ -813,25 +873,46 @@ public class CheckpointCoordinator {
 
                                                        if (subtaskState != 
null) {
                                                                // count the 
number of executions for which we set a state
-                                                               counter++;
+                                                               ++counter;
                                                                state = 
subtaskState.getChainedStateHandle();
                                                        }
                                                }
 
+                                               // partitionable state
+                                               @SuppressWarnings("unchecked")
+                                               
Collection<OperatorStateHandle>[] ia = new 
Collection[taskState.getChainLength()];
+                                               
List<Collection<OperatorStateHandle>> subTaskPartitionableState = 
Arrays.asList(ia);
+
+                                               for (int j = 0; j < 
redistributedParallelStates.length; ++j) {
+                                                       
List<Collection<OperatorStateHandle>> redistributedParallelState =
+                                                                       
redistributedParallelStates[j];
+
+                                                       if 
(redistributedParallelState != null) {
+                                                               
subTaskPartitionableState.set(j, redistributedParallelState.get(i));
+                                                       }
+                                               }
+
+                                               // key-partitioned state
                                                KeyGroupRange 
subtaskKeyGroupIds = keyGroupPartitions.get(i);
 
-                                               List<KeyGroupsStateHandle> 
subtaskKeyGroupStates = getKeyGroupsStateHandles(
-                                                               
taskState.getKeyGroupStates(),
-                                                               
subtaskKeyGroupIds);
+                                               // Again, we only repartition 
if the parallelism changed
+                                               List<KeyGroupsStateHandle> 
subtaskKeyGroupStates = parallelismChanged ?
+                                                               
getKeyGroupsStateHandles(taskState.getKeyGroupStates(), subtaskKeyGroupIds)
+                                                               : 
Collections.singletonList(taskState.getKeyGroupState(i));
 
                                                Execution 
currentExecutionAttempt = executionJobVertex
                                                        .getTaskVertices()[i]
                                                        
.getCurrentExecutionAttempt();
 
-                                               
currentExecutionAttempt.setInitialState(state, subtaskKeyGroupStates);
+                                               CheckpointStateHandles 
checkpointStateHandles = new CheckpointStateHandles(
+                                                               state,
+                                                               
null/*subTaskPartionableState*/, //TODO chose right structure and put 
redistributed states here
+                                                               
subtaskKeyGroupStates);
+
+                                               
currentExecutionAttempt.setInitialState(checkpointStateHandles, 
subTaskPartitionableState);
                                        }
 
-                                       if (allOrNothingState && counter > 0 && 
counter < executionJobVertex.getParallelism()) {
+                                       if (allOrNothingState && counter > 0 && 
counter < newParallelism) {
                                                throw new 
IllegalStateException("The checkpoint contained state only for " +
                                                        "a subset of tasks for 
vertex " + executionJobVertex);
                                        }
@@ -859,7 +940,7 @@ public class CheckpointCoordinator {
 
                for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) 
{
                        KeyGroupsStateHandle intersection = 
storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
-                       if(intersection.getNumberOfKeyGroups() > 0) {
+                       if (intersection.getNumberOfKeyGroups() > 0) {
                                subtaskKeyGroupStates.add(intersection);
                        }
                }
@@ -881,7 +962,7 @@ public class CheckpointCoordinator {
        public static List<KeyGroupRange> createKeyGroupPartitions(int 
numberKeyGroups, int parallelism) {
                Preconditions.checkArgument(numberKeyGroups >= parallelism);
                List<KeyGroupRange> result = new ArrayList<>(parallelism);
-               int start = 0;
+
                for (int i = 0; i < parallelism; ++i) {
                        
result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups,
 parallelism, i));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 7cb3916..0d279f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -153,9 +153,4 @@ public class CompletedCheckpoint implements StateObject {
        public String toString() {
                return String.format("Checkpoint %d @ %d for %s", checkpointID, 
timestamp, job);
        }
-
-       @Override
-       public void close() throws IOException {
-               StateUtil.bestEffortCloseAllStateObjects(taskStates.values());
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
new file mode 100644
index 0000000..98810f1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Interface that allows to implement different strategies for repartitioning 
of operator state as parallelism changes.
+ */
+public interface OperatorStateRepartitioner {
+
+       /**
+        * @param previousParallelSubtaskStates List of state handles to the 
parallel subtask states of an operator, as they
+        *                                      have been checkpointed.
+        * @param parallelism                   The parallelism that we 
consider for the state redistribution. Determines the size of the
+        *                                      returned list.
+        * @return List with one entry per parallel subtask. Each subtask 
receives now one collection of states that build
+        * of the new total state for this subtask.
+        */
+       List<Collection<OperatorStateHandle>> repartitionState(
+                       List<OperatorStateHandle> previousParallelSubtaskStates,
+                       int parallelism);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index d499a5a..2ca9d69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -23,7 +23,9 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
@@ -167,49 +169,80 @@ public class PendingCheckpoint {
        }
        
        public boolean acknowledgeTask(
-               ExecutionAttemptID attemptID,
-               ChainedStateHandle<StreamStateHandle> state,
-               List<KeyGroupsStateHandle> keyGroupsState) {
+                       ExecutionAttemptID attemptID,
+                       CheckpointStateHandles checkpointStateHandles) {
 
                synchronized (lock) {
                        if (discarded) {
                                return false;
                        }
-                       
-                       ExecutionVertex vertex = 
notYetAcknowledgedTasks.remove(attemptID);
-                       if (vertex != null) {
-                               if (state != null || keyGroupsState != null) {
 
-                                       JobVertexID jobVertexID = 
vertex.getJobvertexId();
+                       ExecutionVertex vertex = 
notYetAcknowledgedTasks.remove(attemptID);
 
-                                       TaskState taskState;
+                       if (vertex != null) {
 
-                                       if 
(taskStates.containsKey(jobVertexID)) {
-                                               taskState = 
taskStates.get(jobVertexID);
-                                       } else {
-                                               taskState = new 
TaskState(jobVertexID, vertex.getTotalNumberOfParallelSubtasks(), 
vertex.getMaxParallelism());
-                                               taskStates.put(jobVertexID, 
taskState);
+                               if (checkpointStateHandles != null) {
+                                       List<KeyGroupsStateHandle> 
keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle();
+                                       ChainedStateHandle<StreamStateHandle> 
nonPartitionedState =
+                                                       
checkpointStateHandles.getNonPartitionedStateHandles();
+                                       ChainedStateHandle<OperatorStateHandle> 
partitioneableState =
+                                                       
checkpointStateHandles.getPartitioneableStateHandles();
+
+                                       if (nonPartitionedState != null || 
partitioneableState != null || keyGroupsState != null) {
+
+                                               JobVertexID jobVertexID = 
vertex.getJobvertexId();
+
+                                               int subtaskIndex = 
vertex.getParallelSubtaskIndex();
+
+                                               TaskState taskState;
+
+                                               if 
(taskStates.containsKey(jobVertexID)) {
+                                                       taskState = 
taskStates.get(jobVertexID);
+                                               } else {
+                                                       //TODO this should go 
away when we remove chained state, assigning state to operators directly instead
+                                                       int chainLength;
+                                                       if (nonPartitionedState 
!= null) {
+                                                               chainLength = 
nonPartitionedState.getLength();
+                                                       } else if 
(partitioneableState != null) {
+                                                               chainLength = 
partitioneableState.getLength();
+                                                       } else {
+                                                               chainLength = 1;
+                                                       }
+
+                                                       taskState = new 
TaskState(
+                                                               jobVertexID,
+                                                               
vertex.getTotalNumberOfParallelSubtasks(),
+                                                               
vertex.getMaxParallelism(),
+                                                               chainLength);
+
+                                                       
taskStates.put(jobVertexID, taskState);
+                                               }
+
+                                               long duration = 
System.currentTimeMillis() - checkpointTimestamp;
+
+                                               if (nonPartitionedState != 
null) {
+                                                       taskState.putState(
+                                                                       
subtaskIndex,
+                                                                       new 
SubtaskState(nonPartitionedState, duration));
+                                               }
+
+                                               if(partitioneableState != null 
&& !partitioneableState.isEmpty()) {
+                                                       
taskState.putPartitionableState(subtaskIndex, partitioneableState);
+                                               }
+
+                                               // currently a checkpoint can 
only contain keyed state
+                                               // for the head operator
+                                               if (keyGroupsState != null && 
!keyGroupsState.isEmpty()) {
+                                                       KeyGroupsStateHandle 
keyGroupsStateHandle = keyGroupsState.get(0);
+                                                       
taskState.putKeyedState(subtaskIndex, keyGroupsStateHandle);
+                                               }
                                        }
+                               }
 
-                                       long duration = 
System.currentTimeMillis() - checkpointTimestamp;
+                               ++numAcknowledgedTasks;
 
-                                       if (state != null) {
-                                               taskState.putState(
-                                                       
vertex.getParallelSubtaskIndex(),
-                                                       new SubtaskState(state, 
duration));
-                                       }
-
-                                       // currently a checkpoint can only 
contain keyed state
-                                       // for the head operator
-                                       if (keyGroupsState != null && 
!keyGroupsState.isEmpty()) {
-                                               KeyGroupsStateHandle 
keyGroupsStateHandle = keyGroupsState.get(0);
-                                               
taskState.putKeyedState(vertex.getParallelSubtaskIndex(), keyGroupsStateHandle);
-                                       }
-                               }
-                               numAcknowledgedTasks++;
                                return true;
-                       }
-                       else {
+                       } else {
                                return false;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
new file mode 100644
index 0000000..09a35f6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Current default implementation of {@link OperatorStateRepartitioner} that 
redistributes state in round robin fashion.
+ */
+public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepartitioner {
+
+       public static final OperatorStateRepartitioner INSTANCE = new 
RoundRobinOperatorStateRepartitioner();
+       private static final boolean OPTIMIZE_MEMORY_USE = false;
+
+       @Override
+       public List<Collection<OperatorStateHandle>> repartitionState(
+                       List<OperatorStateHandle> previousParallelSubtaskStates,
+                       int parallelism) {
+
+               Preconditions.checkNotNull(previousParallelSubtaskStates);
+               Preconditions.checkArgument(parallelism > 0);
+
+               // Reorganize: group by (State Name -> StreamStateHandle + 
Offsets)
+               Map<String, List<Tuple2<StreamStateHandle, long[]>>> 
nameToState =
+                               groupByStateName(previousParallelSubtaskStates);
+
+               if (OPTIMIZE_MEMORY_USE) {
+                       previousParallelSubtaskStates.clear(); // free for GC 
at to cost that old handles are no longer available
+               }
+
+               // Assemble result from all merge maps
+               List<Collection<OperatorStateHandle>> result = new 
ArrayList<>(parallelism);
+
+               // Do the actual repartitioning for all named states
+               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList =
+                               repartition(nameToState, parallelism);
+
+               for (int i = 0; i < mergeMapList.size(); ++i) {
+                       result.add(i, new 
ArrayList<>(mergeMapList.get(i).values()));
+               }
+
+               return result;
+       }
+
+       /**
+        * Group by the different named states.
+        */
+       private Map<String, List<Tuple2<StreamStateHandle, long[]>>> 
groupByStateName(
+                       List<OperatorStateHandle> 
previousParallelSubtaskStates) {
+
+               //Reorganize: group by (State Name -> StreamStateHandle + 
Offsets)
+               Map<String, List<Tuple2<StreamStateHandle, long[]>>> 
nameToState = new HashMap<>();
+               for (OperatorStateHandle psh : previousParallelSubtaskStates) {
+
+                       for (Map.Entry<String, long[]> e : 
psh.getStateNameToPartitionOffsets().entrySet()) {
+
+                               List<Tuple2<StreamStateHandle, long[]>> 
stateLocations = nameToState.get(e.getKey());
+
+                               if (stateLocations == null) {
+                                       stateLocations = new ArrayList<>();
+                                       nameToState.put(e.getKey(), 
stateLocations);
+                               }
+
+                               stateLocations.add(new 
Tuple2<>(psh.getDelegateStateHandle(), e.getValue()));
+                       }
+               }
+               return nameToState;
+       }
+
+       /**
+        * Repartition all named states.
+        */
+       private List<Map<StreamStateHandle, OperatorStateHandle>> repartition(
+                       Map<String, List<Tuple2<StreamStateHandle, long[]>>> 
nameToState, int parallelism) {
+
+               // We will use this to merge w.r.t. StreamStateHandles for each 
parallel subtask inside the maps
+               List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList 
= new ArrayList<>(parallelism);
+               // Initialize
+               for (int i = 0; i < parallelism; ++i) {
+                       mergeMapList.add(new HashMap<StreamStateHandle, 
OperatorStateHandle>());
+               }
+
+               int startParallelOP = 0;
+               // Iterate all named states and repartition one named state at 
a time per iteration
+               for (Map.Entry<String, List<Tuple2<StreamStateHandle, long[]>>> 
e : nameToState.entrySet()) {
+
+                       List<Tuple2<StreamStateHandle, long[]>> current = 
e.getValue();
+
+                       // Determine actual number of partitions for this named 
state
+                       int totalPartitions = 0;
+                       for (Tuple2<StreamStateHandle, long[]> offsets : 
current) {
+                               totalPartitions += offsets.f1.length;
+                       }
+
+                       // Repartition the state across the parallel operator 
instances
+                       int lstIdx = 0;
+                       int offsetIdx = 0;
+                       int baseFraction = totalPartitions / parallelism;
+                       int remainder = totalPartitions % parallelism;
+
+                       int newStartParallelOp = startParallelOP;
+
+                       for (int i = 0; i < parallelism; ++i) {
+
+                               // Preparation: calculate the actual index 
considering wrap around
+                               int parallelOpIdx = (i + startParallelOP) % 
parallelism;
+
+                               // Now calculate the number of partitions we 
will assign to the parallel instance in this round ...
+                               int numberOfPartitionsToAssign = baseFraction;
+
+                               // ... and distribute odd partitions while we 
still have some, one at a time
+                               if (remainder > 0) {
+                                       ++numberOfPartitionsToAssign;
+                                       --remainder;
+                               } else if (remainder == 0) {
+                                       // We are out of odd partitions now and 
begin our next redistribution round with the current
+                                       // parallel operator to ensure fair 
load balance
+                                       newStartParallelOp = parallelOpIdx;
+                                       --remainder;
+                               }
+
+                               // Now start collection the partitions for the 
parallel instance into this list
+                               List<Tuple2<StreamStateHandle, long[]>> 
parallelOperatorState = new ArrayList<>();
+
+                               while (numberOfPartitionsToAssign > 0) {
+                                       Tuple2<StreamStateHandle, long[]> 
handleWithOffsets = current.get(lstIdx);
+                                       long[] offsets = handleWithOffsets.f1;
+                                       int remaining = offsets.length - 
offsetIdx;
+                                       // Repartition offsets
+                                       long[] offs;
+                                       if (remaining > 
numberOfPartitionsToAssign) {
+                                               offs = 
Arrays.copyOfRange(offsets, offsetIdx, offsetIdx + numberOfPartitionsToAssign);
+                                               offsetIdx += 
numberOfPartitionsToAssign;
+                                       } else {
+                                               if (OPTIMIZE_MEMORY_USE) {
+                                                       handleWithOffsets.f1 = 
null; // GC
+                                               }
+                                               offs = 
Arrays.copyOfRange(offsets, offsetIdx, offsets.length);
+                                               offsetIdx = 0;
+                                               ++lstIdx;
+                                       }
+
+                                       parallelOperatorState.add(
+                                                       new 
Tuple2<>(handleWithOffsets.f0, offs));
+
+                                       numberOfPartitionsToAssign -= remaining;
+
+                                       // As a last step we merge partitions 
that use the same StreamStateHandle in a single
+                                       // OperatorStateHandle
+                                       Map<StreamStateHandle, 
OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx);
+                                       OperatorStateHandle psh = 
mergeMap.get(handleWithOffsets.f0);
+                                       if (psh == null) {
+                                               psh = new 
OperatorStateHandle(handleWithOffsets.f0, new HashMap<String, long[]>());
+                                               
mergeMap.put(handleWithOffsets.f0, psh);
+                                       }
+                                       
psh.getStateNameToPartitionOffsets().put(e.getKey(), offs);
+                               }
+                       }
+                       startParallelOP = newStartParallelOp;
+                       e.setValue(null);
+               }
+               return mergeMapList;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index 9beb233..2aa0491 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -24,8 +24,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -112,11 +110,4 @@ public class SubtaskState implements StateObject {
        public String toString() {
                return String.format("SubtaskState(Size: %d, Duration: %d, 
State: %s)", stateSize, duration, chainedStateHandle);
        }
-
-       @Override
-       public void close() throws IOException {
-               chainedStateHandle.close();
-       }
-
-
 }

Reply via email to