http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index f5e3618..7e4eded 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import com.google.common.collect.Iterables;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.Preconditions;
@@ -47,27 +49,36 @@ public class TaskState implements StateObject {
        /** handles to non-partitioned states, subtaskindex -> subtaskstate */
        private final Map<Integer, SubtaskState> subtaskStates;
 
-       /** handles to partitioned states, subtaskindex -> keyed state */
+       /** handles to partitionable states, subtaskindex -> partitionable 
state */
+       private final Map<Integer, ChainedStateHandle<OperatorStateHandle>> 
partitionableStates;
+
+       /** handles to key-partitioned states, subtaskindex -> keyed state */
        private final Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles;
 
+
        /** parallelism of the operator when it was checkpointed */
        private final int parallelism;
 
        /** maximum parallelism of the operator when the job was first created 
*/
        private final int maxParallelism;
 
-       public TaskState(JobVertexID jobVertexID, int parallelism, int 
maxParallelism) {
+       private final int chainLength;
+
+       public TaskState(JobVertexID jobVertexID, int parallelism, int 
maxParallelism, int chainLength) {
                Preconditions.checkArgument(
                                parallelism <= maxParallelism,
                                "Parallelism " + parallelism + " is not smaller 
or equal to max parallelism " + maxParallelism + ".");
+               Preconditions.checkArgument(chainLength > 0, "There has to be 
at least one operator in the operator chain.");
 
                this.jobVertexID = jobVertexID;
 
                this.subtaskStates = new HashMap<>(parallelism);
+               this.partitionableStates = new HashMap<>(parallelism);
                this.keyGroupsStateHandles = new HashMap<>(parallelism);
 
                this.parallelism = parallelism;
                this.maxParallelism = maxParallelism;
+               this.chainLength = chainLength;
        }
 
        public JobVertexID getJobVertexID() {
@@ -85,6 +96,20 @@ public class TaskState implements StateObject {
                }
        }
 
+       public void putPartitionableState(
+                       int subtaskIndex,
+                       ChainedStateHandle<OperatorStateHandle> 
partitionableState) {
+
+               Preconditions.checkNotNull(partitionableState);
+
+               if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+                       throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
+                                       " exceeds the maximum number of sub 
tasks " + subtaskStates.size());
+               } else {
+                       partitionableStates.put(subtaskIndex, 
partitionableState);
+               }
+       }
+
        public void putKeyedState(int subtaskIndex, KeyGroupsStateHandle 
keyGroupsStateHandle) {
                Preconditions.checkNotNull(keyGroupsStateHandle);
 
@@ -106,6 +131,15 @@ public class TaskState implements StateObject {
                }
        }
 
+       public ChainedStateHandle<OperatorStateHandle> 
getPartitionableState(int subtaskIndex) {
+               if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+                       throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
+                                       " exceeds the maximum number of sub 
tasks " + subtaskStates.size());
+               } else {
+                       return partitionableStates.get(subtaskIndex);
+               }
+       }
+
        public KeyGroupsStateHandle getKeyGroupState(int subtaskIndex) {
                if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
                        throw new IndexOutOfBoundsException("The given sub task 
index " + subtaskIndex +
@@ -131,6 +165,10 @@ public class TaskState implements StateObject {
                return maxParallelism;
        }
 
+       public int getChainLength() {
+               return chainLength;
+       }
+
        public Collection<KeyGroupsStateHandle> getKeyGroupStates() {
                return keyGroupsStateHandles.values();
        }
@@ -147,7 +185,7 @@ public class TaskState implements StateObject {
        @Override
        public void discardState() throws Exception {
                StateUtil.bestEffortDiscardAllStateObjects(
-                               Iterables.concat(subtaskStates.values(), 
keyGroupsStateHandles.values()));
+                               Iterables.concat(subtaskStates.values(), 
partitionableStates.values(), keyGroupsStateHandles.values()));
        }
 
 
@@ -156,11 +194,19 @@ public class TaskState implements StateObject {
                long result = 0L;
 
                for (int i = 0; i < parallelism; i++) {
-                       if (subtaskStates.get(i) != null) {
-                               result += subtaskStates.get(i).getStateSize();
+                       SubtaskState subtaskState = subtaskStates.get(i);
+                       if (subtaskState != null) {
+                               result += subtaskState.getStateSize();
+                       }
+
+                       ChainedStateHandle<OperatorStateHandle> 
partitionableState = partitionableStates.get(i);
+                       if (partitionableState != null) {
+                               result += partitionableState.getStateSize();
                        }
-                       if (keyGroupsStateHandles.get(i) != null) {
-                               result += 
keyGroupsStateHandles.get(i).getStateSize();
+
+                       KeyGroupsStateHandle keyGroupsState = 
keyGroupsStateHandles.get(i);
+                       if (keyGroupsState != null) {
+                               result += keyGroupsState.getStateSize();
                        }
                }
 
@@ -172,8 +218,11 @@ public class TaskState implements StateObject {
                if (obj instanceof TaskState) {
                        TaskState other = (TaskState) obj;
 
-                       return jobVertexID.equals(other.jobVertexID) && 
parallelism == other.parallelism &&
-                               subtaskStates.equals(other.subtaskStates) && 
keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
+                       return jobVertexID.equals(other.jobVertexID)
+                                       && parallelism == other.parallelism
+                                       && 
subtaskStates.equals(other.subtaskStates)
+                                       && 
partitionableStates.equals(other.partitionableStates)
+                                       && 
keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
                } else {
                        return false;
                }
@@ -181,13 +230,7 @@ public class TaskState implements StateObject {
 
        @Override
        public int hashCode() {
-               return parallelism + 31 * Objects.hash(jobVertexID, 
subtaskStates, keyGroupsStateHandles);
-       }
-
-       @Override
-       public void close() throws IOException {
-               StateUtil.bestEffortCloseAllStateObjects(
-                               Iterables.concat(subtaskStates.values(), 
keyGroupsStateHandles.values()));
+               return parallelism + 31 * Objects.hash(jobVertexID, 
subtaskStates, partitionableStates, keyGroupsStateHandles);
        }
 
        public Map<Integer, SubtaskState> getSubtaskStates() {
@@ -197,4 +240,8 @@ public class TaskState implements StateObject {
        public Map<Integer, KeyGroupsStateHandle> getKeyGroupsStateHandles() {
                return Collections.unmodifiableMap(keyGroupsStateHandles);
        }
+
+       public Map<Integer, ChainedStateHandle<OperatorStateHandle>> 
getPartitionableStates() {
+               return partitionableStates;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index f07f44f..536062a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -35,6 +36,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -51,6 +53,7 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
        private static final byte BYTE_STREAM_STATE_HANDLE = 1;
        private static final byte FILE_STREAM_STATE_HANDLE = 2;
        private static final byte KEY_GROUPS_HANDLE = 3;
+       private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
 
 
        public static final SavepointV1Serializer INSTANCE = new 
SavepointV1Serializer();
@@ -75,8 +78,9 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                                int parallelism = taskState.getParallelism();
                                dos.writeInt(parallelism);
                                dos.writeInt(taskState.getMaxParallelism());
+                               dos.writeInt(taskState.getChainLength());
 
-                               // Sub task states
+                               // Sub task non-partitionable states
                                Map<Integer, SubtaskState> subtaskStateMap = 
taskState.getSubtaskStates();
                                dos.writeInt(subtaskStateMap.size());
                                for (Map.Entry<Integer, SubtaskState> entry : 
subtaskStateMap.entrySet()) {
@@ -93,7 +97,22 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                                        
dos.writeLong(subtaskState.getDuration());
                                }
 
+                               // Sub task partitionable states
+                               Map<Integer, 
ChainedStateHandle<OperatorStateHandle>> partitionableStatesMap = 
taskState.getPartitionableStates();
+                               dos.writeInt(partitionableStatesMap.size());
 
+                               for (Map.Entry<Integer, 
ChainedStateHandle<OperatorStateHandle>> entry : 
partitionableStatesMap.entrySet()) {
+                                       dos.writeInt(entry.getKey());
+
+                                       ChainedStateHandle<OperatorStateHandle> 
chainedStateHandle = entry.getValue();
+                                       
dos.writeInt(chainedStateHandle.getLength());
+                                       for (int j = 0; j < 
chainedStateHandle.getLength(); ++j) {
+                                               OperatorStateHandle stateHandle 
= chainedStateHandle.get(j);
+                                               
serializePartitionableStateHandle(stateHandle, dos);
+                                       }
+                               }
+
+                               // Keyed state
                                Map<Integer, KeyGroupsStateHandle> 
keyGroupsStateHandles = taskState.getKeyGroupsStateHandles();
                                dos.writeInt(keyGroupsStateHandles.size());
                                for (Map.Entry<Integer, KeyGroupsStateHandle> 
entry : keyGroupsStateHandles.entrySet()) {
@@ -119,9 +138,10 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                        JobVertexID jobVertexId = new 
JobVertexID(dis.readLong(), dis.readLong());
                        int parallelism = dis.readInt();
                        int maxParallelism = dis.readInt();
+                       int chainLength = dis.readInt();
 
                        // Add task state
-                       TaskState taskState = new TaskState(jobVertexId, 
parallelism, maxParallelism);
+                       TaskState taskState = new TaskState(jobVertexId, 
parallelism, maxParallelism, chainLength);
                        taskStates.add(taskState);
 
                        // Sub task states
@@ -142,6 +162,24 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                                taskState.putState(subtaskIndex, subtaskState);
                        }
 
+                       int numPartitionableOpStates = dis.readInt();
+
+                       for (int j = 0; j < numPartitionableOpStates; j++) {
+                               int subtaskIndex = dis.readInt();
+                               int chainedStateHandleSize = dis.readInt();
+                               List<OperatorStateHandle> streamStateHandleList 
= new ArrayList<>(chainedStateHandleSize);
+
+                               for (int k = 0; k < chainedStateHandleSize; 
++k) {
+                                       OperatorStateHandle streamStateHandle = 
deserializePartitionableStateHandle(dis);
+                                       
streamStateHandleList.add(streamStateHandle);
+                               }
+
+                               ChainedStateHandle<OperatorStateHandle> 
chainedStateHandle =
+                                               new 
ChainedStateHandle<>(streamStateHandleList);
+
+                               taskState.putPartitionableState(subtaskIndex, 
chainedStateHandle);
+                       }
+
                        // Key group states
                        int numKeyGroupStates = dis.readInt();
                        for (int j = 0; j < numKeyGroupStates; j++) {
@@ -157,7 +195,9 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                return new SavepointV1(checkpointId, taskStates);
        }
 
-       public static void serializeKeyGroupStateHandle(KeyGroupsStateHandle 
stateHandle, DataOutputStream dos) throws IOException {
+       public static void serializeKeyGroupStateHandle(
+                       KeyGroupsStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
+
                if (stateHandle != null) {
                        dos.writeByte(KEY_GROUPS_HANDLE);
                        
dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
@@ -172,10 +212,10 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
        }
 
        public static KeyGroupsStateHandle 
deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
-               int type = dis.readByte();
+               final int type = dis.readByte();
                if (NULL_HANDLE == type) {
                        return null;
-               } else {
+               } else if (KEY_GROUPS_HANDLE == type) {
                        int startKeyGroup = dis.readInt();
                        int numKeyGroups = dis.readInt();
                        KeyGroupRange keyGroupRange = 
KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
@@ -186,6 +226,53 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                        KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(keyGroupRange, offsets);
                        StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
                        return new KeyGroupsStateHandle(keyGroupRangeOffsets, 
stateHandle);
+               } else {
+                       throw new IllegalStateException("Reading invalid 
KeyGroupsStateHandle, type: " + type);
+               }
+       }
+
+       public static void serializePartitionableStateHandle(
+                       OperatorStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
+
+               if (stateHandle != null) {
+                       dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+                       Map<String, long[]> partitionOffsetsMap = 
stateHandle.getStateNameToPartitionOffsets();
+                       dos.writeInt(partitionOffsetsMap.size());
+                       for (Map.Entry<String, long[]> entry : 
partitionOffsetsMap.entrySet()) {
+                               dos.writeUTF(entry.getKey());
+                               long[] offsets = entry.getValue();
+                               dos.writeInt(offsets.length);
+                               for (int i = 0; i < offsets.length; ++i) {
+                                       dos.writeLong(offsets[i]);
+                               }
+                       }
+                       
serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+               } else {
+                       dos.writeByte(NULL_HANDLE);
+               }
+       }
+
+       public static OperatorStateHandle deserializePartitionableStateHandle(
+                       DataInputStream dis) throws IOException {
+
+               final int type = dis.readByte();
+               if (NULL_HANDLE == type) {
+                       return null;
+               } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+                       int mapSize = dis.readInt();
+                       Map<String, long[]> offsetsMap = new HashMap<>(mapSize);
+                       for (int i = 0; i < mapSize; ++i) {
+                               String key = dis.readUTF();
+                               long[] offsets = new long[dis.readInt()];
+                               for (int j = 0; j < offsets.length; ++j) {
+                                       offsets[j] = dis.readLong();
+                               }
+                               offsetsMap.put(key, offsets);
+                       }
+                       StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
+                       return new OperatorStateHandle(stateHandle, offsetsMap);
+               } else {
+                       throw new IllegalStateException("Reading invalid 
OperatorStateHandle, type: " + type);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index ca976e4..7bbdb2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.SerializedValue;
 
@@ -100,6 +101,8 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        /** Handle to the key-grouped state of the head operator in the chain */
        private final List<KeyGroupsStateHandle> keyGroupState;
 
+       private final List<Collection<OperatorStateHandle>> 
partitionableOperatorState;
+
        /** The execution configuration (see {@link ExecutionConfig}) related 
to the specific job. */
        private final SerializedValue<ExecutionConfig> 
serializedExecutionConfig;
 
@@ -107,26 +110,27 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
         * Constructs a task deployment descriptor.
         */
        public TaskDeploymentDescriptor(
-                       JobID jobID,
-                       String jobName,
-                       JobVertexID vertexID,
-                       ExecutionAttemptID executionId,
-                       SerializedValue<ExecutionConfig> 
serializedExecutionConfig,
-                       String taskName,
-                       int numberOfKeyGroups,
-                       int indexInSubtaskGroup,
-                       int numberOfSubtasks,
-                       int attemptNumber,
-                       Configuration jobConfiguration,
-                       Configuration taskConfiguration,
-                       String invokableClassName,
-                       List<ResultPartitionDeploymentDescriptor> 
producedPartitions,
-                       List<InputGateDeploymentDescriptor> inputGates,
-                       List<BlobKey> requiredJarFiles,
-                       List<URL> requiredClasspaths,
-                       int targetSlotNumber,
-                       ChainedStateHandle<StreamStateHandle> operatorState,
-                       List<KeyGroupsStateHandle> keyGroupState) {
+               JobID jobID,
+               String jobName,
+               JobVertexID vertexID,
+               ExecutionAttemptID executionId,
+               SerializedValue<ExecutionConfig> serializedExecutionConfig,
+               String taskName,
+               int numberOfKeyGroups,
+               int indexInSubtaskGroup,
+               int numberOfSubtasks,
+               int attemptNumber,
+               Configuration jobConfiguration,
+               Configuration taskConfiguration,
+               String invokableClassName,
+               List<ResultPartitionDeploymentDescriptor> producedPartitions,
+               List<InputGateDeploymentDescriptor> inputGates,
+               List<BlobKey> requiredJarFiles,
+               List<URL> requiredClasspaths,
+               int targetSlotNumber,
+               ChainedStateHandle<StreamStateHandle> operatorState,
+               List<KeyGroupsStateHandle> keyGroupState,
+               List<Collection<OperatorStateHandle>> 
partitionableOperatorStateHandles) {
 
                checkArgument(indexInSubtaskGroup >= 0);
                checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -153,6 +157,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                this.targetSlotNumber = targetSlotNumber;
                this.operatorState = operatorState;
                this.keyGroupState = keyGroupState;
+               this.partitionableOperatorState = 
partitionableOperatorStateHandles;
        }
 
        public TaskDeploymentDescriptor(
@@ -195,6 +200,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        requiredClasspaths,
                        targetSlotNumber,
                        null,
+                       null,
                        null);
        }
 
@@ -347,4 +353,8 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        public List<KeyGroupsStateHandle> getKeyGroupState() {
                return keyGroupState;
        }
+
+       public List<Collection<OperatorStateHandle>> 
getPartitionableOperatorState() {
+               return partitionableOperatorState;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 273c0d9..f6cde95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -34,13 +34,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -187,12 +184,8 @@ public interface Environment {
         * the checkpoint with the give checkpoint-ID. This method does include
         * the given state in the checkpoint.
         *
-        * @param checkpointId
-        *             The ID of the checkpoint.
-        * @param chainedStateHandle
-        *             Handle for the chained operator state
-        * @param keyGroupStateHandles
-        *             Handles for key group state
+        * @param checkpointId The ID of the checkpoint.
+        * @param checkpointStateHandles All state handles for the checkpointed 
state
         * @param synchronousDurationMillis
         *             The duration (in milliseconds) of the synchronous part 
of the operator checkpoint
         * @param asynchronousDurationMillis
@@ -204,8 +197,7 @@ public interface Environment {
         */
        void acknowledgeCheckpoint(
                        long checkpointId,
-                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle,
-                       List<KeyGroupsStateHandle> keyGroupStateHandles,
+                       CheckpointStateHandles checkpointStateHandles,
                        long synchronousDurationMillis,
                        long asynchronousDurationMillis,
                        long bytesBufferedInAlignment,

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 912ff10..b92e3af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -46,6 +47,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -56,6 +58,7 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -134,6 +137,8 @@ public class Execution {
 
        private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
 
+       private List<Collection<OperatorStateHandle>> 
chainedPartitionableStateHandle;
+
        private List<KeyGroupsStateHandle> keyGroupsStateHandles;
        
 
@@ -223,6 +228,10 @@ public class Execution {
                return keyGroupsStateHandles;
        }
 
+       public List<Collection<OperatorStateHandle>> 
getChainedPartitionableStateHandle() {
+               return chainedPartitionableStateHandle;
+       }
+
        public boolean isFinished() {
                return state.isTerminal();
        }
@@ -246,18 +255,19 @@ public class Execution {
         * Sets the initial state for the execution. The serialized state is 
then shipped via the
         * {@link TaskDeploymentDescriptor} to the TaskManagers.
         *
-        * @param chainedStateHandle Chained operator state
-        * @param keyGroupsStateHandles Key-group state (= partitioned state)
+        * @param checkpointStateHandles all checkpointed operator state
         */
-       public void setInitialState(
-               ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-                       List<KeyGroupsStateHandle> keyGroupsStateHandles) {
+       public void setInitialState(CheckpointStateHandles 
checkpointStateHandles, List<Collection<OperatorStateHandle>> 
chainedPartitionableStateHandle) {
 
                if (state != ExecutionState.CREATED) {
                        throw new IllegalArgumentException("Can only assign 
operator state when execution attempt is in CREATED");
                }
-               this.chainedStateHandle = chainedStateHandle;
-               this.keyGroupsStateHandles = keyGroupsStateHandles;
+
+               if(checkpointStateHandles != null) {
+                       this.chainedStateHandle = 
checkpointStateHandles.getNonPartitionedStateHandles();
+                       this.chainedPartitionableStateHandle = 
chainedPartitionableStateHandle;
+                       this.keyGroupsStateHandles = 
checkpointStateHandles.getKeyGroupsStateHandle();
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -385,6 +395,7 @@ public class Execution {
                                slot,
                                chainedStateHandle,
                                keyGroupsStateHandles,
+                               chainedPartitionableStateHandle,
                                attemptNumber);
 
                        // register this execution at the execution graph, to 
receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c3fa0b..6023205 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -56,10 +56,8 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a8d5ee4..4837803 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -53,6 +54,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -629,6 +631,7 @@ public class ExecutionVertex {
                        SimpleSlot targetSlot,
                        ChainedStateHandle<StreamStateHandle> operatorState,
                        List<KeyGroupsStateHandle> keyGroupStates,
+                       List<Collection<OperatorStateHandle>> 
partitionableOperatorStateHandle,
                        int attemptNumber) {
 
                // Produced intermediate results
@@ -681,7 +684,8 @@ public class ExecutionVertex {
                        classpaths,
                        targetSlot.getRoot().getSlotNumber(),
                        operatorState,
-                       keyGroupStates);
+                       keyGroupStates,
+                       partitionableOperatorStateHandle);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 9ddfdf7..55e3e09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -33,11 +35,16 @@ public interface StatefulTask {
        /**
         * Sets the initial state of the operator, upon recovery. The initial 
state is typically
         * a snapshot of the state from a previous execution.
-        * 
+        *
+        * TODO this should use @{@link 
org.apache.flink.runtime.state.CheckpointStateHandles} after redoing chained 
state.
+        *
         * @param chainedState Handle for the chained operator states.
         * @param keyGroupsState Handle for key group states.
         */
-       void setInitialState(ChainedStateHandle<StreamStateHandle> 
chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
+       void setInitialState(
+               ChainedStateHandle<StreamStateHandle> chainedState,
+               List<KeyGroupsStateHandle> keyGroupsState,
+               List<Collection<OperatorStateHandle>> 
partitionableOperatorState) throws Exception;
 
        /**
         * This method is called to trigger a checkpoint, asynchronously by the 
checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 72396eb..e95e7b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -20,11 +20,7 @@ package org.apache.flink.runtime.messages.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import java.util.List;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -32,7 +28,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * This message is sent from the {@link 
org.apache.flink.runtime.taskmanager.TaskManager} to the
  * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the 
checkpoint of an
  * individual task is completed.
- * 
+ * <p>
  * <p>This message may carry the handle to the task's chained operator state 
and the key group
  * state.
  */
@@ -40,9 +36,8 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
 
        private static final long serialVersionUID = -7606214777192401493L;
 
-       private final ChainedStateHandle<StreamStateHandle> stateHandle;
 
-       private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
+       private final CheckpointStateHandles checkpointStateHandles;
 
        /** The duration (in milliseconds) that the synchronous part of the 
checkpoint took */
        private final long synchronousDurationMillis;
@@ -62,24 +57,22 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
                        JobID job,
                        ExecutionAttemptID taskExecutionId,
                        long checkpointId) {
-               this(job, taskExecutionId, checkpointId, null, null);
+               this(job, taskExecutionId, checkpointId, null);
        }
 
        public AcknowledgeCheckpoint(
                        JobID job,
                        ExecutionAttemptID taskExecutionId,
                        long checkpointId,
-                       ChainedStateHandle<StreamStateHandle> state,
-                       List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
-               this(job, taskExecutionId, checkpointId, state, 
keyGroupStateAndSizes, -1L, -1L, -1L, -1L);
+                       CheckpointStateHandles checkpointStateHandles) {
+               this(job, taskExecutionId, checkpointId, 
checkpointStateHandles, -1L, -1L, -1L, -1L);
        }
 
        public AcknowledgeCheckpoint(
                        JobID job,
                        ExecutionAttemptID taskExecutionId,
                        long checkpointId,
-                       ChainedStateHandle<StreamStateHandle> state,
-                       List<KeyGroupsStateHandle> keyGroupStateAndSizes,
+                       CheckpointStateHandles checkpointStateHandles,
                        long synchronousDurationMillis,
                        long asynchronousDurationMillis,
                        long bytesBufferedInAlignment,
@@ -87,9 +80,7 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
 
                super(job, taskExecutionId, checkpointId);
 
-               // these may be null in cases where the operator has no state
-               this.stateHandle = state;
-               this.keyGroupsStateHandle = keyGroupStateAndSizes;
+               this.checkpointStateHandles = checkpointStateHandles;
 
                // these may be "-1", in case the values are unknown or not set
                checkArgument(synchronousDurationMillis >= -1);
@@ -107,12 +98,8 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
        //  properties
        // 
------------------------------------------------------------------------
 
-       public ChainedStateHandle<StreamStateHandle> getStateHandle() {
-               return stateHandle;
-       }
-
-       public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
-               return keyGroupsStateHandle;
+       public CheckpointStateHandles getCheckpointStateHandles() {
+               return checkpointStateHandles;
        }
 
        public long getSynchronousDurationMillis() {
@@ -134,31 +121,33 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       public int hashCode() {
-               return super.hashCode();
-       }
-
-       @Override
        public boolean equals(Object o) {
                if (this == o) {
-                       return true ;
+                       return true;
                }
-               else if (o instanceof AcknowledgeCheckpoint) {
-                       AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
-                       return super.equals(o) &&
-                                       (this.stateHandle == null ? 
that.stateHandle == null : 
-                                                       (that.stateHandle != 
null && this.stateHandle.equals(that.stateHandle))) &&
-                                       (this.keyGroupsStateHandle == null ? 
that.keyGroupsStateHandle == null : 
-                                                       
(that.keyGroupsStateHandle != null && 
this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
+               if (!(o instanceof AcknowledgeCheckpoint)) {
+                       return false;
                }
-               else {
+               if (!super.equals(o)) {
                        return false;
                }
+
+               AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
+               return checkpointStateHandles != null ?
+                               
checkpointStateHandles.equals(that.checkpointStateHandles) : 
that.checkpointStateHandles == null;
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = super.hashCode();
+               result = 31 * result + (checkpointStateHandles != null ? 
checkpointStateHandles.hashCode() : 0);
+               return result;
        }
 
        @Override
        public String toString() {
-               return String.format("Confirm Task Checkpoint %d for (%s/%s) - 
state=%s keyGroupState=%s",
-                               getCheckpointId(), getJob(), 
getTaskExecutionId(), stateHandle, keyGroupsStateHandle);
+               return String.format("Confirm Task Checkpoint %d for (%s/%s) - 
state=%s",
+                               getCheckpointId(), getJob(), 
getTaskExecutionId(), checkpointStateHandles);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
deleted file mode 100644
index 5966c95..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-/**
- * A simple base for closable handles.
- *
- * Offers to register a stream (or other closable object) that close calls are 
delegated to if
- * the handle is closed or was already closed.
- */
-public abstract class AbstractCloseableHandle implements Closeable, 
StateObject {
-
-       /** Serial Version UID must be constant to maintain format 
compatibility */
-       private static final long serialVersionUID = 1L;
-
-       /** To atomically update the "closable" field without needing to add a 
member class like "AtomicBoolean */
-       private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> 
CLOSER =
-                       
AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed");
-
-       // 
------------------------------------------------------------------------
-
-       /** The closeable to close if this handle is closed late */
-       private transient volatile Closeable toClose;
-
-       /** Flag to remember if this handle was already closed */
-       @SuppressWarnings("unused") // this field is actually updated, but via 
the "CLOSER" updater
-       private transient volatile int isClosed;
-
-       // 
------------------------------------------------------------------------
-
-       protected final void registerCloseable(Closeable toClose) throws 
IOException {
-               if (toClose == null) {
-                       return;
-               }
-
-               // NOTE: The order of operations matters here:
-               // (1) first setting the closeable
-               // (2) checking the flag.
-               // Because the order in the {@link #close()} method is the 
opposite, and
-               // both variables are volatile (reordering barriers), we can be 
sure that
-               // one of the methods always notices the effect of a concurrent 
call to the
-               // other method.
-
-               this.toClose = toClose;
-
-               // check if we were closed early
-               if (this.isClosed != 0) {
-                       toClose.close();
-                       throw new IOException("handle is closed");
-               }
-       }
-
-       /**
-        * Closes the handle.
-        *
-        * <p>If a "Closeable" has been registered via {@link 
#registerCloseable(Closeable)},
-        * then this will be closes.
-        *
-        * <p>If any "Closeable" will be registered via {@link 
#registerCloseable(Closeable)} in the future,
-        * it will immediately be closed and that method will throw an 
exception.
-        *
-        * @throws IOException Exceptions occurring while closing an already 
registered {@code Closeable}
-        *                     are forwarded.
-        *
-        * @see #registerCloseable(Closeable)
-        */
-       @Override
-       public final void close() throws IOException {
-               // NOTE: The order of operations matters here:
-               // (1) first setting the closed flag
-               // (2) checking whether there is already a closeable
-               // Because the order in the {@link 
#registerCloseable(Closeable)} method is the opposite, and
-               // both variables are volatile (reordering barriers), we can be 
sure that
-               // one of the methods always notices the effect of a concurrent 
call to the
-               // other method.
-
-               if (CLOSER.compareAndSet(this, 0, 1)) {
-                       final Closeable toClose = this.toClose;
-                       if (toClose != null) {
-                               this.toClose = null;
-                               toClose.close();
-                       }
-               }
-       }
-
-       /**
-        * Checks whether this handle has been closed.
-        *
-        * @return True is the handle is closed, false otherwise.
-        */
-       public boolean isClosed() {
-               return isClosed != 0;
-       }
-
-       /**
-        * This method checks whether the handle is closed and throws an 
exception if it is closed.
-        * If the handle is not closed, this method does nothing.
-        *
-        * @throws IOException Thrown, if the handle has been closed.
-        */
-       public void ensureNotClosed() throws IOException {
-               if (isClosed != 0) {
-                       throw new IOException("handle is closed");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
new file mode 100644
index 0000000..7ca3b38
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Base implementation of KeyedStateBackend. The state can be checkpointed
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ *
+ * @param <K> Type of the key by which state is keyed.
+ */
+public abstract class AbstractKeyedStateBackend<K>
+               implements KeyedStateBackend<K>, 
SnapshotProvider<KeyGroupsStateHandle>, Closeable {
+
+       /** {@link TypeSerializer} for our key. */
+       protected final TypeSerializer<K> keySerializer;
+
+       /** The currently active key. */
+       protected K currentKey;
+
+       /** The key group of the currently active key */
+       private int currentKeyGroup;
+
+       /** So that we can give out state when the user uses the same key. */
+       protected HashMap<String, KvState<?>> keyValueStatesByName;
+
+       /** For caching the last accessed partitioned state */
+       private String lastName;
+
+       @SuppressWarnings("rawtypes")
+       private KvState lastState;
+
+       /** The number of key-groups aka max parallelism */
+       protected final int numberOfKeyGroups;
+
+       /** Range of key-groups for which this backend is responsible */
+       protected final KeyGroupRange keyGroupRange;
+
+       /** KvStateRegistry helper for this task */
+       protected final TaskKvStateRegistry kvStateRegistry;
+
+       /** Registry for all opened streams, so they can be closed if the task 
using this backend is closed */
+       protected ClosableRegistry cancelStreamRegistry;
+
+       protected final ClassLoader userCodeClassLoader;
+
+       public AbstractKeyedStateBackend(
+                       TaskKvStateRegistry kvStateRegistry,
+                       TypeSerializer<K> keySerializer,
+                       ClassLoader userCodeClassLoader,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange) {
+
+               this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
+               this.keySerializer = Preconditions.checkNotNull(keySerializer);
+               this.numberOfKeyGroups = 
Preconditions.checkNotNull(numberOfKeyGroups);
+               this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+               this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+               this.cancelStreamRegistry = new ClosableRegistry();
+       }
+
+       /**
+        * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
+        * checkpoint data.
+        *
+        */
+       @Override
+       public void dispose() {
+               if (kvStateRegistry != null) {
+                       kvStateRegistry.unregisterAll();
+               }
+
+               lastName = null;
+               lastState = null;
+               keyValueStatesByName = null;
+       }
+
+       /**
+        * Creates and returns a new {@link ValueState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> The type of the value that the {@code ValueState} can 
store.
+        */
+       protected abstract <N, T> ValueState<T> 
createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> 
stateDesc) throws Exception;
+
+       /**
+        * Creates and returns a new {@link ListState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> The type of the values that the {@code ListState} can 
store.
+        */
+       protected abstract <N, T> ListState<T> 
createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> 
stateDesc) throws Exception;
+
+       /**
+        * Creates and returns a new {@link ReducingState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> The type of the values that the {@code ListState} can 
store.
+        */
+       protected abstract <N, T> ReducingState<T> 
createReducingState(TypeSerializer<N> namespaceSerializer, 
ReducingStateDescriptor<T> stateDesc) throws Exception;
+
+       /**
+        * Creates and returns a new {@link FoldingState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> Type of the values folded into the state
+        * @param <ACC> Type of the value in the state   *
+        */
+       protected abstract <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+       /**
+        * @see KeyedStateBackend
+        */
+       @Override
+       public void setCurrentKey(K newKey) {
+               this.currentKey = newKey;
+               this.currentKeyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
+       }
+
+       /**
+        * @see KeyedStateBackend
+        */
+       @Override
+       public TypeSerializer<K> getKeySerializer() {
+               return keySerializer;
+       }
+
+       /**
+        * @see KeyedStateBackend
+        */
+       @Override
+       public K getCurrentKey() {
+               return currentKey;
+       }
+
+       /**
+        * @see KeyedStateBackend
+        */
+       @Override
+       public int getCurrentKeyGroupIndex() {
+               return currentKeyGroup;
+       }
+
+       /**
+        * @see KeyedStateBackend
+        */
+       @Override
+       public int getNumberOfKeyGroups() {
+               return numberOfKeyGroups;
+       }
+
+       /**
+        * @see KeyedStateBackend
+        */
+       public KeyGroupRange getKeyGroupRange() {
+               return keyGroupRange;
+       }
+
+       /**
+        * @see KeyedStateBackend
+        */
+       @Override
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       public <N, S extends State> S getPartitionedState(final N namespace, 
final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
+               Preconditions.checkNotNull(namespace, "Namespace");
+               Preconditions.checkNotNull(namespaceSerializer, "Namespace 
serializer");
+
+               if (keySerializer == null) {
+                       throw new RuntimeException("State key serializer has 
not been configured in the config. " +
+                                       "This operation cannot use partitioned 
state.");
+               }
+               
+               if (!stateDescriptor.isSerializerInitialized()) {
+                       stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
+               }
+
+               if (keyValueStatesByName == null) {
+                       keyValueStatesByName = new HashMap<>();
+               }
+
+               if (lastName != null && 
lastName.equals(stateDescriptor.getName())) {
+                       lastState.setCurrentNamespace(namespace);
+                       return (S) lastState;
+               }
+
+               KvState<?> previous = 
keyValueStatesByName.get(stateDescriptor.getName());
+               if (previous != null) {
+                       lastState = previous;
+                       lastState.setCurrentNamespace(namespace);
+                       lastName = stateDescriptor.getName();
+                       return (S) previous;
+               }
+
+               // create a new blank key/value state
+               S state = stateDescriptor.bind(new StateBackend() {
+                       @Override
+                       public <T> ValueState<T> 
createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
+                               return 
AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
+                       }
+
+                       @Override
+                       public <T> ListState<T> 
createListState(ListStateDescriptor<T> stateDesc) throws Exception {
+                               return 
AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
+                       }
+
+                       @Override
+                       public <T> ReducingState<T> 
createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+                               return 
AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, 
stateDesc);
+                       }
+
+                       @Override
+                       public <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+                               return 
AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, 
stateDesc);
+                       }
+
+               });
+
+               KvState kvState = (KvState) state;
+
+               keyValueStatesByName.put(stateDescriptor.getName(), kvState);
+
+               lastName = stateDescriptor.getName();
+               lastState = kvState;
+
+               kvState.setCurrentNamespace(namespace);
+
+               // Publish queryable state
+               if (stateDescriptor.isQueryable()) {
+                       if (kvStateRegistry == null) {
+                               throw new IllegalStateException("State backend 
has not been initialized for job.");
+                       }
+
+                       String name = stateDescriptor.getQueryableStateName();
+                       kvStateRegistry.registerKvState(keyGroupRange, name, 
kvState);
+               }
+
+               return state;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked,rawtypes")
+       public <N, S extends MergingState<?, ?>> void 
mergePartitionedStates(final N target, Collection<N> sources, final 
TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
+               if (stateDescriptor instanceof ReducingStateDescriptor) {
+                       ReducingStateDescriptor reducingStateDescriptor = 
(ReducingStateDescriptor) stateDescriptor;
+                       ReduceFunction reduceFn = 
reducingStateDescriptor.getReduceFunction();
+                       ReducingState state = (ReducingState) 
getPartitionedState(target, namespaceSerializer, stateDescriptor);
+                       KvState kvState = (KvState) state;
+                       Object result = null;
+                       for (N source: sources) {
+                               kvState.setCurrentNamespace(source);
+                               Object sourceValue = state.get();
+                               if (result == null) {
+                                       result = state.get();
+                               } else if (sourceValue != null) {
+                                       result = reduceFn.reduce(result, 
sourceValue);
+                               }
+                               state.clear();
+                       }
+                       kvState.setCurrentNamespace(target);
+                       if (result != null) {
+                               state.add(result);
+                       }
+               } else if (stateDescriptor instanceof ListStateDescriptor) {
+                       ListState<Object> state = (ListState) 
getPartitionedState(target, namespaceSerializer, stateDescriptor);
+                       KvState kvState = (KvState) state;
+                       List<Object> result = new ArrayList<>();
+                       for (N source: sources) {
+                               kvState.setCurrentNamespace(source);
+                               Iterable<Object> sourceValue = state.get();
+                               if (sourceValue != null) {
+                                       for (Object o : sourceValue) {
+                                               result.add(o);
+                                       }
+                               }
+                               state.clear();
+                       }
+                       kvState.setCurrentNamespace(target);
+                       for (Object o : result) {
+                               state.add(o);
+                       }
+               } else {
+                       throw new RuntimeException("Cannot merge states for " + 
stateDescriptor);
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               cancelStreamRegistry.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 0d2bf45..c2e665b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 /**
@@ -36,31 +37,33 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
         * Creates a {@link CheckpointStreamFactory} that can be used to create 
streams
         * that should end up in a checkpoint.
         *
-        * @param jobId The {@link JobID} of the job for which we are creating 
checkpoint streams.
+        * @param jobId              The {@link JobID} of the job for which we 
are creating checkpoint streams.
         * @param operatorIdentifier An identifier of the operator for which we 
create streams.
         */
        public abstract CheckpointStreamFactory createStreamFactory(
                        JobID jobId,
-                       String operatorIdentifier) throws IOException;
+                       String operatorIdentifier
+       ) throws IOException;
 
        /**
-        * Creates a new {@link KeyedStateBackend} that is responsible for 
keeping keyed state
+        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for keeping keyed state
         * and can be checkpointed to checkpoint streams.
         */
-       public abstract <K> KeyedStateBackend<K> createKeyedStateBackend(
+       public abstract <K> AbstractKeyedStateBackend<K> 
createKeyedStateBackend(
                        Environment env,
                        JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception;
+                       TaskKvStateRegistry kvStateRegistry
+       ) throws Exception;
 
        /**
-        * Creates a new {@link KeyedStateBackend} that restores its state from 
the given list
+        * Creates a new {@link AbstractKeyedStateBackend} that restores its 
state from the given list
         * {@link KeyGroupsStateHandle KeyGroupStateHandles}.
         */
-       public abstract <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+       public abstract <K> AbstractKeyedStateBackend<K> 
restoreKeyedStateBackend(
                        Environment env,
                        JobID jobID,
                        String operatorIdentifier,
@@ -68,6 +71,30 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        List<KeyGroupsStateHandle> restoredState,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception;
+                       TaskKvStateRegistry kvStateRegistry
+       ) throws Exception;
 
+
+       /**
+        * Creates a new {@link OperatorStateBackend} that can be used for 
storing partitionable operator
+        * state in checkpoint streams.
+        */
+       public OperatorStateBackend createOperatorStateBackend(
+                       Environment env,
+                       String operatorIdentifier
+       ) throws Exception {
+               return new DefaultOperatorStateBackend();
+       }
+
+       /**
+        * Creates a new {@link OperatorStateBackend} that restores its state 
from the given collection of
+        * {@link OperatorStateHandle}.
+        */
+       public OperatorStateBackend restoreOperatorStateBackend(
+                       Environment env,
+                       String operatorIdentifier,
+                       Collection<OperatorStateHandle> restoreSnapshots
+       ) throws Exception {
+               return new DefaultOperatorStateBackend(restoreSnapshots);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
index 74057ee..c6904c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Handle to the non-partitioned states for the operators in an operator chain.
+ * Handle to state handles for the operators in an operator chain.
  */
 public class ChainedStateHandle<T extends StateObject> implements StateObject {
 
@@ -123,9 +123,4 @@ public class ChainedStateHandle<T extends StateObject> 
implements StateObject {
        public static <T extends StateObject> ChainedStateHandle<T> 
wrapSingleHandle(T stateHandleToWrap) {
                return new 
ChainedStateHandle<T>(Collections.singletonList(stateHandleToWrap));
        }
-
-       @Override
-       public void close() throws IOException {
-               StateUtil.bestEffortCloseAllStateObjects(operatorStateHandles);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
new file mode 100644
index 0000000..9daf963
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Container state handles that contains all state handles from the different 
state types of a checkpointed state.
+ * TODO This will be changed in the future if we get rid of chained state and 
instead connect state directly to individual operators in a chain.
+ */
+public class CheckpointStateHandles implements Serializable {
+
+       private static final long serialVersionUID = 3252351989995L;
+
+       private final ChainedStateHandle<StreamStateHandle> 
nonPartitionedStateHandles;
+
+       private final ChainedStateHandle<OperatorStateHandle> 
partitioneableStateHandles;
+
+       private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
+
+       public CheckpointStateHandles(
+                       ChainedStateHandle<StreamStateHandle> 
nonPartitionedStateHandles,
+                       ChainedStateHandle<OperatorStateHandle> 
partitioneableStateHandles,
+                       List<KeyGroupsStateHandle> keyGroupsStateHandle) {
+
+               this.nonPartitionedStateHandles = nonPartitionedStateHandles;
+               this.partitioneableStateHandles = partitioneableStateHandles;
+               this.keyGroupsStateHandle = keyGroupsStateHandle;
+       }
+
+       public ChainedStateHandle<StreamStateHandle> 
getNonPartitionedStateHandles() {
+               return nonPartitionedStateHandles;
+       }
+
+       public ChainedStateHandle<OperatorStateHandle> 
getPartitioneableStateHandles() {
+               return partitioneableStateHandles;
+       }
+
+       public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
+               return keyGroupsStateHandle;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (!(o instanceof CheckpointStateHandles)) {
+                       return false;
+               }
+
+               CheckpointStateHandles that = (CheckpointStateHandles) o;
+
+               if (nonPartitionedStateHandles != null ?
+                               
!nonPartitionedStateHandles.equals(that.nonPartitionedStateHandles)
+                               : that.nonPartitionedStateHandles != null) {
+                       return false;
+               }
+
+               if (partitioneableStateHandles != null ?
+                               
!partitioneableStateHandles.equals(that.partitioneableStateHandles)
+                               : that.partitioneableStateHandles != null) {
+                       return false;
+               }
+               return keyGroupsStateHandle != null ?
+                               
keyGroupsStateHandle.equals(that.keyGroupsStateHandle) : 
that.keyGroupsStateHandle == null;
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = nonPartitionedStateHandles != null ? 
nonPartitionedStateHandles.hashCode() : 0;
+               result = 31 * result + (partitioneableStateHandles != null ? 
partitioneableStateHandles.hashCode() : 0);
+               result = 31 * result + (keyGroupsStateHandle != null ? 
keyGroupsStateHandle.hashCode() : 0);
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "CheckpointStateHandles{" +
+                               "nonPartitionedStateHandles=" + 
nonPartitionedStateHandles +
+                               ", partitioneableStateHandles=" + 
partitioneableStateHandles +
+                               ", keyGroupsStateHandle=" + 
keyGroupsStateHandle +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
new file mode 100644
index 0000000..26d6192
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClosableRegistry implements Closeable {
+
+       private final Set<Closeable> registeredCloseables;
+       private boolean closed;
+
+       public ClosableRegistry() {
+               this.registeredCloseables = new HashSet<>();
+               this.closed = false;
+       }
+
+       public boolean registerClosable(Closeable closeable) {
+
+               if (null == closeable) {
+                       return false;
+               }
+
+               synchronized (getSynchronizationLock()) {
+                       if (closed) {
+                               throw new IllegalStateException("Cannot 
register Closable, registry is already closed.");
+                       }
+
+                       return registeredCloseables.add(closeable);
+               }
+       }
+
+       public boolean unregisterClosable(Closeable closeable) {
+
+               if (null == closeable) {
+                       return false;
+               }
+
+               synchronized (getSynchronizationLock()) {
+                       return registeredCloseables.remove(closeable);
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+
+               if (!registeredCloseables.isEmpty()) {
+
+                       synchronized (getSynchronizationLock()) {
+
+                               for (Closeable closeable : 
registeredCloseables) {
+                                       IOUtils.closeQuietly(closeable);
+                               }
+
+                               registeredCloseables.clear();
+                               closed = true;
+                       }
+               }
+       }
+
+       private Object getSynchronizationLock() {
+               return registeredCloseables;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
new file mode 100644
index 0000000..0bd5eeb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Default implementation of OperatorStateStore that provides the ability to 
make snapshots.
+ */
+public class DefaultOperatorStateBackend implements OperatorStateBackend {
+
+       private final Map<String, PartitionableListState<?>> registeredStates;
+       private final Collection<OperatorStateHandle> restoreSnapshots;
+       private final ClosableRegistry closeStreamOnCancelRegistry;
+
+       /**
+        * Restores a OperatorStateStore (lazily) using the provided snapshots.
+        *
+        * @param restoreSnapshots snapshots that are available to restore 
partitionable states on request.
+        */
+       public DefaultOperatorStateBackend(
+                       Collection<OperatorStateHandle> restoreSnapshots) {
+               this.restoreSnapshots = restoreSnapshots;
+               this.registeredStates = new HashMap<>();
+               this.closeStreamOnCancelRegistry = new ClosableRegistry();
+       }
+
+       /**
+        * Creates an empty OperatorStateStore.
+        */
+       public DefaultOperatorStateBackend() {
+               this(null);
+       }
+
+       /**
+        * @see OperatorStateStore
+        */
+       @Override
+       public <S> ListState<S> getPartitionableState(
+                       ListStateDescriptor<S> stateDescriptor) throws 
IOException {
+
+               Preconditions.checkNotNull(stateDescriptor);
+
+               String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
+               TypeSerializer<S> partitionStateSerializer = 
Preconditions.checkNotNull(stateDescriptor.getSerializer());
+
+               @SuppressWarnings("unchecked")
+               PartitionableListState<S> partitionableListState = 
(PartitionableListState<S>) registeredStates.get(name);
+
+               if (null == partitionableListState) {
+
+                       partitionableListState = new 
PartitionableListState<>(partitionStateSerializer);
+
+                       registeredStates.put(name, partitionableListState);
+
+                       // Try to restore previous state if state handles to 
snapshots are provided
+                       if (restoreSnapshots != null) {
+                               for (OperatorStateHandle stateHandle : 
restoreSnapshots) {
+
+                                       long[] offsets = 
stateHandle.getStateNameToPartitionOffsets().get(name);
+
+                                       if (offsets != null) {
+
+                                               FSDataInputStream in = 
stateHandle.openInputStream();
+                                               try {
+                                                       
closeStreamOnCancelRegistry.registerClosable(in);
+
+                                                       DataInputView div = new 
DataInputViewStreamWrapper(in);
+
+                                                       for (int i = 0; i < 
offsets.length; ++i) {
+
+                                                               
in.seek(offsets[i]);
+                                                               S 
partitionState = partitionStateSerializer.deserialize(div);
+                                                               
partitionableListState.add(partitionState);
+                                                       }
+                                               } finally {
+                                                       
closeStreamOnCancelRegistry.unregisterClosable(in);
+                                                       in.close();
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               return partitionableListState;
+       }
+
+       /**
+        * @see SnapshotProvider
+        */
+       @Override
+       public RunnableFuture<OperatorStateHandle> snapshot(
+                       long checkpointId, long timestamp, 
CheckpointStreamFactory streamFactory) throws Exception {
+
+               if (registeredStates.isEmpty()) {
+                       return new DoneFuture<>(null);
+               }
+
+               Map<String, long[]> writtenStatesMetaData = new 
HashMap<>(registeredStates.size());
+
+               CheckpointStreamFactory.CheckpointStateOutputStream out = 
streamFactory.
+                               createCheckpointStateOutputStream(checkpointId, 
timestamp);
+
+               try {
+                       closeStreamOnCancelRegistry.registerClosable(out);
+
+                       DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
+
+                       dov.writeInt(registeredStates.size());
+                       for (Map.Entry<String, PartitionableListState<?>> entry 
: registeredStates.entrySet()) {
+
+                               long[] partitionOffsets = 
entry.getValue().write(out);
+                               writtenStatesMetaData.put(entry.getKey(), 
partitionOffsets);
+                       }
+
+                       OperatorStateHandle handle = new 
OperatorStateHandle(out.closeAndGetHandle(), writtenStatesMetaData);
+
+                       return new DoneFuture<>(handle);
+               } finally {
+                       closeStreamOnCancelRegistry.unregisterClosable(out);
+                       out.close();
+               }
+       }
+
+       @Override
+       public void dispose() {
+
+       }
+
+       static final class PartitionableListState<S> implements ListState<S> {
+
+               private final List<S> listState;
+               private final TypeSerializer<S> partitionStateSerializer;
+
+               public PartitionableListState(TypeSerializer<S> 
partitionStateSerializer) {
+                       this.listState = new ArrayList<>();
+                       this.partitionStateSerializer = 
Preconditions.checkNotNull(partitionStateSerializer);
+               }
+
+               @Override
+               public void clear() {
+                       listState.clear();
+               }
+
+               @Override
+               public Iterable<S> get() {
+                       return listState;
+               }
+
+               @Override
+               public void add(S value) {
+                       listState.add(value);
+               }
+
+               public long[] write(FSDataOutputStream out) throws IOException {
+
+                       long[] partitionOffsets = new long[listState.size()];
+
+                       DataOutputView dov = new 
DataOutputViewStreamWrapper(out);
+
+                       for (int i = 0; i < listState.size(); ++i) {
+                               S element = listState.get(i);
+                               partitionOffsets[i] = out.getPos();
+                               partitionStateSerializer.serialize(element, 
dov);
+                       }
+
+                       return partitionOffsets;
+               }
+       }
+
+       @Override
+       public Set<String> getRegisteredStateNames() {
+               return registeredStates.keySet();
+       }
+
+       @Override
+       public void close() throws IOException {
+               closeStreamOnCancelRegistry.close();
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
index 4f0a82b..8e7207e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
  */
 public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>> , 
Serializable {
 
+       private static final long serialVersionUID = 6595415219136429696L;
+
        /** the range of key-groups */
        private final KeyGroupRange keyGroupRange;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 7f87e86..ea12808 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -138,7 +138,6 @@ public class KeyGroupsStateHandle implements StateObject {
                        return false;
                }
                return stateHandle.equals(that.stateHandle);
-
        }
 
        @Override
@@ -155,9 +154,4 @@ public class KeyGroupsStateHandle implements StateObject {
                                ", data=" + stateHandle +
                                '}';
        }
-
-       @Override
-       public void close() throws IOException {
-               stateHandle.close();
-       }
 }

Reply via email to