http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 5612f73..7293a84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,178 +18,55 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MergingState;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.RunnableFuture;
 
 /**
- * A keyed state backend is responsible for managing keyed state. The state 
can be checkpointed
- * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ * A keyed state backend provides methods for managing keyed state.
  *
  * @param <K> The key by which state is keyed.
  */
-public abstract class KeyedStateBackend<K> {
-
-       /** {@link TypeSerializer} for our key. */
-       protected final TypeSerializer<K> keySerializer;
-
-       /** The currently active key. */
-       protected K currentKey;
-
-       /** The key group of the currently active key */
-       private int currentKeyGroup;
-
-       /** So that we can give out state when the user uses the same key. */
-       protected HashMap<String, KvState<?>> keyValueStatesByName;
-
-       /** For caching the last accessed partitioned state */
-       private String lastName;
-
-       @SuppressWarnings("rawtypes")
-       private KvState lastState;
-
-       /** The number of key-groups aka max parallelism */
-       protected final int numberOfKeyGroups;
-
-       /** Range of key-groups for which this backend is responsible */
-       protected final KeyGroupRange keyGroupRange;
-
-       /** KvStateRegistry helper for this task */
-       protected final TaskKvStateRegistry kvStateRegistry;
-
-       protected final ClassLoader userCodeClassLoader;
-
-       public KeyedStateBackend(
-                       TaskKvStateRegistry kvStateRegistry,
-                       TypeSerializer<K> keySerializer,
-                       ClassLoader userCodeClassLoader,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange) {
-
-               this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
-               this.keySerializer = Preconditions.checkNotNull(keySerializer);
-               this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
-               this.numberOfKeyGroups = 
Preconditions.checkNotNull(numberOfKeyGroups);
-               this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
-       }
+public interface KeyedStateBackend<K> {
 
        /**
-        * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
-        * checkpoint data.
-        *
-        * @throws Exception Exceptions can be forwarded and will be logged by 
the system
+        * Sets the current key that is used for partitioned state.
+        * @param newKey The new current key.
         */
-       public void close() throws Exception {
-               if (kvStateRegistry != null) {
-                       kvStateRegistry.unregisterAll();
-               }
-
-               lastName = null;
-               lastState = null;
-               keyValueStatesByName = null;
-       }
+       void setCurrentKey(K newKey);
 
        /**
-        * Creates and returns a new {@link ValueState}.
-        *
-        * @param namespaceSerializer TypeSerializer for the state namespace.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <N> The type of the namespace.
-        * @param <T> The type of the value that the {@code ValueState} can 
store.
+        * Used by states to access the current key.
         */
-       protected abstract <N, T> ValueState<T> 
createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> 
stateDesc) throws Exception;
+       K getCurrentKey();
 
        /**
-        * Creates and returns a new {@link ListState}.
-        *
-        * @param namespaceSerializer TypeSerializer for the state namespace.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <N> The type of the namespace.
-        * @param <T> The type of the values that the {@code ListState} can 
store.
+        * Returns the key-group to which the current key belongs.
         */
-       protected abstract <N, T> ListState<T> 
createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> 
stateDesc) throws Exception;
+       int getCurrentKeyGroupIndex();
 
        /**
-        * Creates and returns a new {@link ReducingState}.
-        *
-        * @param namespaceSerializer TypeSerializer for the state namespace.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <N> The type of the namespace.
-        * @param <T> The type of the values that the {@code ListState} can 
store.
+        * Returns the number of key-groups aka max parallelism.
         */
-       protected abstract <N, T> ReducingState<T> 
createReducingState(TypeSerializer<N> namespaceSerializer, 
ReducingStateDescriptor<T> stateDesc) throws Exception;
+       int getNumberOfKeyGroups();
 
        /**
-        * Creates and returns a new {@link FoldingState}.
-        *
-        * @param namespaceSerializer TypeSerializer for the state namespace.
-        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
-        *
-        * @param <N> The type of the namespace.
-        * @param <T> Type of the values folded into the state
-        * @param <ACC> Type of the value in the state   *
+        * Returns the key group range for this backend.
         */
-       protected abstract <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-
-       /**
-        * Sets the current key that is used for partitioned state.
-        * @param newKey The new current key.
-        */
-       public void setCurrentKey(K newKey) {
-               this.currentKey = newKey;
-               this.currentKeyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
-       }
+       KeyGroupRange getKeyGroupRange();
 
        /**
         * {@link TypeSerializer} for the state backend key type.
         */
-       public TypeSerializer<K> getKeySerializer() {
-               return keySerializer;
-       }
-
-       /**
-        * Used by states to access the current key.
-        */
-       public K getCurrentKey() {
-               return currentKey;
-       }
-
-       public int getCurrentKeyGroupIndex() {
-               return currentKeyGroup;
-       }
-
-       public int getNumberOfKeyGroups() {
-               return numberOfKeyGroups;
-       }
+       TypeSerializer<K> getKeySerializer();
 
        /**
         * Creates or retrieves a partitioned state backed by this state 
backend.
         *
-        * @param stateDescriptor The state identifier for the state. This 
contains name
-        *                           and can create a default state value.
+        * @param stateDescriptor The identifier for the state. This contains 
name and can create a default state value.
 
         * @param <N> The type of the namespace.
         * @param <S> The type of the state.
@@ -199,145 +76,21 @@ public abstract class KeyedStateBackend<K> {
         * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
         */
        @SuppressWarnings({"rawtypes", "unchecked"})
-       public <N, S extends State> S getPartitionedState(final N namespace, 
final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
-               Preconditions.checkNotNull(namespace, "Namespace");
-               Preconditions.checkNotNull(namespaceSerializer, "Namespace 
serializer");
-
-               if (keySerializer == null) {
-                       throw new RuntimeException("State key serializer has 
not been configured in the config. " +
-                                       "This operation cannot use partitioned 
state.");
-               }
-               
-               if (!stateDescriptor.isSerializerInitialized()) {
-                       stateDescriptor.initializeSerializerUnlessSet(new 
ExecutionConfig());
-               }
-
-               if (keyValueStatesByName == null) {
-                       keyValueStatesByName = new HashMap<>();
-               }
-
-               if (lastName != null && 
lastName.equals(stateDescriptor.getName())) {
-                       lastState.setCurrentNamespace(namespace);
-                       return (S) lastState;
-               }
-
-               KvState<?> previous = 
keyValueStatesByName.get(stateDescriptor.getName());
-               if (previous != null) {
-                       lastState = previous;
-                       lastState.setCurrentNamespace(namespace);
-                       lastName = stateDescriptor.getName();
-                       return (S) previous;
-               }
-
-               // create a new blank key/value state
-               S state = stateDescriptor.bind(new StateBackend() {
-                       @Override
-                       public <T> ValueState<T> 
createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
-                               return 
KeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
-                       }
+       <N, S extends State> S getPartitionedState(
+                       N namespace,
+                       TypeSerializer<N> namespaceSerializer,
+                       StateDescriptor<S, ?> stateDescriptor) throws Exception;
 
-                       @Override
-                       public <T> ListState<T> 
createListState(ListStateDescriptor<T> stateDesc) throws Exception {
-                               return 
KeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
-                       }
-
-                       @Override
-                       public <T> ReducingState<T> 
createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
-                               return 
KeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
-                       }
-
-                       @Override
-                       public <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-                               return 
KeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
-                       }
-
-               });
-
-               KvState kvState = (KvState) state;
-
-               keyValueStatesByName.put(stateDescriptor.getName(), kvState);
-
-               lastName = stateDescriptor.getName();
-               lastState = kvState;
-
-               kvState.setCurrentNamespace(namespace);
-
-               // Publish queryable state
-               if (stateDescriptor.isQueryable()) {
-                       if (kvStateRegistry == null) {
-                               throw new IllegalStateException("State backend 
has not been initialized for job.");
-                       }
-
-                       String name = stateDescriptor.getQueryableStateName();
-                       kvStateRegistry.registerKvState(keyGroupRange, name, 
kvState);
-               }
-
-               return state;
-       }
 
        @SuppressWarnings("unchecked,rawtypes")
-       public <N, S extends MergingState<?, ?>> void 
mergePartitionedStates(final N target, Collection<N> sources, final 
TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
-               if (stateDescriptor instanceof ReducingStateDescriptor) {
-                       ReducingStateDescriptor reducingStateDescriptor = 
(ReducingStateDescriptor) stateDescriptor;
-                       ReduceFunction reduceFn = 
reducingStateDescriptor.getReduceFunction();
-                       ReducingState state = (ReducingState) 
getPartitionedState(target, namespaceSerializer, stateDescriptor);
-                       KvState kvState = (KvState) state;
-                       Object result = null;
-                       for (N source: sources) {
-                               kvState.setCurrentNamespace(source);
-                               Object sourceValue = state.get();
-                               if (result == null) {
-                                       result = state.get();
-                               } else if (sourceValue != null) {
-                                       result = reduceFn.reduce(result, 
sourceValue);
-                               }
-                               state.clear();
-                       }
-                       kvState.setCurrentNamespace(target);
-                       if (result != null) {
-                               state.add(result);
-                       }
-               } else if (stateDescriptor instanceof ListStateDescriptor) {
-                       ListState<Object> state = (ListState) 
getPartitionedState(target, namespaceSerializer, stateDescriptor);
-                       KvState kvState = (KvState) state;
-                       List<Object> result = new ArrayList<>();
-                       for (N source: sources) {
-                               kvState.setCurrentNamespace(source);
-                               Iterable<Object> sourceValue = state.get();
-                               if (sourceValue != null) {
-                                       for (Object o : sourceValue) {
-                                               result.add(o);
-                                       }
-                               }
-                               state.clear();
-                       }
-                       kvState.setCurrentNamespace(target);
-                       for (Object o : result) {
-                               state.add(o);
-                       }
-               } else {
-                       throw new RuntimeException("Cannot merge states for " + 
stateDescriptor);
-               }
-       }
+       <N, S extends MergingState<?, ?>> void mergePartitionedStates(
+                       N target,
+                       Collection<N> sources,
+                       TypeSerializer<N> namespaceSerializer,
+                       StateDescriptor<S, ?> stateDescriptor) throws Exception;
 
        /**
-        * Snapshots the keyed state by writing it to streams that are provided 
by a
-        * {@link CheckpointStreamFactory}.
-        *
-        * @param checkpointId The ID of the checkpoint.
-        * @param timestamp The timestamp of the checkpoint.
-        * @param streamFactory The factory that we can use for writing our 
state to streams.
-        *
-        * @return A future that will yield a {@link KeyGroupsStateHandle} with 
the index and
-        * written key group state stream.
+        * Closes the backend and releases all resources.
         */
-       public abstract RunnableFuture<KeyGroupsStateHandle> snapshot(
-                       long checkpointId,
-                       long timestamp,
-                       CheckpointStreamFactory streamFactory) throws Exception;
-
-
-       public KeyGroupRange getKeyGroupRange() {
-               return keyGroupRange;
-       }
+       void dispose();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
new file mode 100644
index 0000000..4e980b7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Closeable;
+
+/**
+ * Interface that combines both, the user facing {@link OperatorStateStore} 
interface and the system interface
+ * {@link SnapshotProvider}
+ *
+ */
+public interface OperatorStateBackend extends OperatorStateStore, 
SnapshotProvider<OperatorStateHandle>, Closeable {
+
+       /**
+        * Disposes the backend and releases all resources.
+        */
+       void dispose();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
new file mode 100644
index 0000000..3e2d713
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * State handle for partitionable operator state. Besides being a {@link 
StreamStateHandle}, this also provides a
+ * map that contains the offsets to the partitions of named states in the 
stream.
+ */
+public class OperatorStateHandle implements StreamStateHandle {
+
+       private static final long serialVersionUID = 35876522969227335L;
+
+       /** unique state name -> offsets for available partitions in the handle 
stream */
+       private final Map<String, long[]> stateNameToPartitionOffsets;
+       private final StreamStateHandle delegateStateHandle;
+
+       public OperatorStateHandle(
+                       StreamStateHandle delegateStateHandle,
+                       Map<String, long[]> stateNameToPartitionOffsets) {
+
+               this.delegateStateHandle = 
Preconditions.checkNotNull(delegateStateHandle);
+               this.stateNameToPartitionOffsets = 
Preconditions.checkNotNull(stateNameToPartitionOffsets);
+       }
+
+       public Map<String, long[]> getStateNameToPartitionOffsets() {
+               return stateNameToPartitionOffsets;
+       }
+
+       @Override
+       public void discardState() throws Exception {
+               delegateStateHandle.discardState();
+       }
+
+       @Override
+       public long getStateSize() throws IOException {
+               return delegateStateHandle.getStateSize();
+       }
+
+       @Override
+       public FSDataInputStream openInputStream() throws IOException {
+               return delegateStateHandle.openInputStream();
+       }
+
+       public StreamStateHandle getDelegateStateHandle() {
+               return delegateStateHandle;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (!(o instanceof OperatorStateHandle)) {
+                       return false;
+               }
+
+               OperatorStateHandle that = (OperatorStateHandle) o;
+
+               if(stateNameToPartitionOffsets.size() != 
that.stateNameToPartitionOffsets.size()) {
+                       return false;
+               }
+
+               for (Map.Entry<String, long[]> entry : 
stateNameToPartitionOffsets.entrySet()) {
+                       if (!Arrays.equals(entry.getValue(), 
that.stateNameToPartitionOffsets.get(entry.getKey()))) {
+                               return false;
+                       }
+               }
+
+               return delegateStateHandle.equals(that.delegateStateHandle);
+       }
+
+       @Override
+       public int hashCode() {
+               int result = delegateStateHandle.hashCode();
+               for (Map.Entry<String, long[]> entry : 
stateNameToPartitionOffsets.entrySet()) {
+
+                       int entryHash = entry.getKey().hashCode();
+                       if (entry.getValue() != null) {
+                               entryHash += Arrays.hashCode(entry.getValue());
+                       }
+                       result = 31 * result + entryHash;
+               }
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
new file mode 100644
index 0000000..6914a7c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+
+import java.util.Set;
+
+/**
+ * Interface for a backend that manages partitionable operator state.
+ */
+public interface OperatorStateStore {
+
+       /**
+        * Creates (or restores) the partitionable state in this backend. Each 
state is registered under a unique name.
+        * The provided serializer is used to de/serialize the state in case of 
checkpointing (snapshot/restore).
+        *
+        * @param stateDescriptor The descriptr for this state, providing a 
name and serializer
+        * @param <S> The generic type of the state
+        * @return A list for all state partitions.
+        * @throws Exception
+        */
+       <S> ListState<S> getPartitionableState(ListStateDescriptor<S> 
stateDescriptor) throws Exception;
+
+       /**
+        * Returns a set with the names of all currently registered states.
+        * @return set of names for all registered states.
+        */
+       Set<String> getRegisteredStateNames();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java
new file mode 100644
index 0000000..065f9c2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PartitionableCheckpointStateOutputStream extends 
FSDataOutputStream {
+
+       private final Map<String, long[]> stateNameToPartitionOffsets;
+       private final CheckpointStreamFactory.CheckpointStateOutputStream 
delegate;
+
+       public 
PartitionableCheckpointStateOutputStream(CheckpointStreamFactory.CheckpointStateOutputStream
 delegate) {
+               this.delegate = Preconditions.checkNotNull(delegate);
+               this.stateNameToPartitionOffsets = new HashMap<>();
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return delegate.getPos();
+       }
+
+       @Override
+       public void flush() throws IOException {
+               delegate.flush();
+       }
+
+       @Override
+       public void sync() throws IOException {
+               delegate.sync();
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+               delegate.write(b);
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+               delegate.write(b);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               delegate.write(b, off, len);
+       }
+
+       @Override
+       public void close() throws IOException {
+               delegate.close();
+       }
+
+       public OperatorStateHandle closeAndGetHandle() throws IOException {
+               StreamStateHandle streamStateHandle = 
delegate.closeAndGetHandle();
+               return new OperatorStateHandle(streamStateHandle, 
stateNameToPartitionOffsets);
+       }
+
+       public void startNewPartition(String stateName) throws IOException {
+               long[] offs = stateNameToPartitionOffsets.get(stateName);
+               if (offs == null) {
+                       offs = new long[1];
+               } else {
+                       //TODO maybe we can use some primitive array list here 
instead of an array to avoid resize on each call.
+                       offs = Arrays.copyOf(offs, offs.length + 1);
+               }
+
+               offs[offs.length - 1] = getPos();
+               stateNameToPartitionOffsets.put(stateName, offs);
+       }
+
+       public static PartitionableCheckpointStateOutputStream wrap(
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream) {
+               return new PartitionableCheckpointStateOutputStream(stream);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index 9ecc4c9..9934382 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -76,6 +76,6 @@ public class RetrievableStreamStateHandle<T extends 
Serializable> implements
 
        @Override
        public void close() throws IOException {
-               wrappedStreamStateHandle.close();
+//             wrappedStreamStateHandle.close();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java
new file mode 100644
index 0000000..c47fedd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Interface for operations that can perform snapshots of their state.
+ *
+ * @param <S> Generic type of the state object that is created as handle to 
snapshots.
+ */
+public interface SnapshotProvider<S extends StateObject> {
+
+       /**
+        * Operation that writes a snapshot into a stream that is provided by 
the given {@link CheckpointStreamFactory} and
+        * returns a @{@link RunnableFuture} that gives a state handle to the 
snapshot. It is up to the implementation if
+        * the operation is performed synchronous or asynchronous. In the later 
case, the returned Runnable must be executed
+        * first before obtaining the handle.
+        *
+        * @param checkpointId  The ID of the checkpoint.
+        * @param timestamp     The timestamp of the checkpoint.
+        * @param streamFactory The factory that we can use for writing our 
state to streams.
+        * @return A runnable future that will yield a {@link StateObject}.
+        */
+       RunnableFuture<S> snapshot(
+                       long checkpointId,
+                       long timestamp,
+                       CheckpointStreamFactory streamFactory) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index 4c65318..a502b9d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -28,13 +28,9 @@ import java.io.IOException;
  * <ul>
  *     <li><b>Discard State</b>: The {@link #discardState()} method defines 
how state is permanently
  *         disposed/deleted. After that method call, state may not be 
recoverable any more.</li>
- 
- *     <li><b>Close the current state access</b>: The {@link #close()} method 
defines how to
- *         stop the current access or recovery to the state. Called for 
example when an operation is
- *         canceled during recovery.</li>
  * </ul>
  */
-public interface StateObject extends java.io.Closeable, java.io.Serializable {
+public interface StateObject extends java.io.Serializable {
 
        /**
         * Discards the state referred to by this handle, to free up resources 
in

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index aa28404..a4799bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import java.io.IOException;
-
 /**
  * Helpers for {@link StateObject} related code.
  */
@@ -63,39 +61,4 @@ public class StateUtil {
                        }
                }
        }
-
-       /**
-        * Iterates through the passed state handles and calls discardState() 
on each handle that is not null. All
-        * occurring exceptions are suppressed and collected until the 
iteration is over and emitted as a single exception.
-        *
-        * @param handlesToDiscard State handles to discard. Passed iterable is 
allowed to deliver null values.
-        * @throws IOException exception that is a collection of all suppressed 
exceptions that were caught during iteration
-        */
-       public static void bestEffortCloseAllStateObjects(
-                       Iterable<? extends StateObject> handlesToDiscard) 
throws IOException {
-
-               if (handlesToDiscard != null) {
-
-                       IOException suppressedExceptions = null;
-
-                       for (StateObject state : handlesToDiscard) {
-
-                               if (state != null) {
-                                       try {
-                                               state.close();
-                                       } catch (Exception ex) {
-                                               //best effort to still cleanup 
other states and deliver exceptions in the end
-                                               if (suppressedExceptions == 
null) {
-                                                       suppressedExceptions = 
new IOException(ex);
-                                               }
-                                               
suppressedExceptions.addSuppressed(ex);
-                                       }
-                               }
-                       }
-
-                       if (suppressedExceptions != null) {
-                               throw suppressedExceptions;
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index f361263..29e905c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.state.filesystem;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.IOException;
@@ -34,7 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * {@link StreamStateHandle} for state that was written to a file stream. The 
written data is
  * identifier by the file path. The state can be read again by calling {@link 
#openInputStream()}.
  */
-public class FileStateHandle extends AbstractCloseableHandle implements 
StreamStateHandle {
+public class FileStateHandle implements StreamStateHandle {
 
        private static final long serialVersionUID = 350284443258002355L;
 
@@ -69,10 +68,7 @@ public class FileStateHandle extends AbstractCloseableHandle 
implements StreamSt
 
        @Override
        public FSDataInputStream openInputStream() throws IOException {
-               ensureNotClosed();
-               FSDataInputStream inputStream = getFileSystem().open(filePath);
-               registerCloseable(inputStream);
-               return inputStream;
+               return getFileSystem().open(filePath);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 99e3684..e027632 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -24,11 +24,11 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -175,7 +175,7 @@ public class FsStateBackend extends AbstractStateBackend {
        }
 
        @Override
-       public <K> KeyedStateBackend<K> createKeyedStateBackend(
+       public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                        Environment env,
                        JobID jobID,
                        String operatorIdentifier,
@@ -192,7 +192,7 @@ public class FsStateBackend extends AbstractStateBackend {
        }
 
        @Override
-       public <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+       public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
                        Environment env,
                        JobID jobID,
                        String operatorIdentifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index c13be70..a766373 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -27,17 +28,18 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -51,13 +53,13 @@ import java.util.Map;
 import java.util.concurrent.RunnableFuture;
 
 /**
- * A {@link KeyedStateBackend} that keeps state on the Java Heap and will 
serialize state to
+ * A {@link AbstractKeyedStateBackend} that keeps state on the Java Heap and 
will serialize state to
  * streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory} upon
  * checkpointing.
  *
  * @param <K> The key by which state is keyed.
  */
-public class HeapKeyedStateBackend<K> extends KeyedStateBackend<K> {
+public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(HeapKeyedStateBackend.class);
 
@@ -165,85 +167,83 @@ public class HeapKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
                        long timestamp,
                        CheckpointStreamFactory streamFactory) throws Exception 
{
 
-               CheckpointStreamFactory.CheckpointStateOutputStream stream =
-                               streamFactory.createCheckpointStateOutputStream(
-                                               checkpointId,
-                                               timestamp);
-
                if (stateTables.isEmpty()) {
                        return new DoneFuture<>(null);
                }
 
-               DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(stream);
+               try (CheckpointStreamFactory.CheckpointStateOutputStream stream 
= streamFactory.
+                               createCheckpointStateOutputStream(checkpointId, 
timestamp)) {
 
-               Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
-                               "Too many KV-States: " + stateTables.size() +
-                                               ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
+                       DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(stream);
 
-               outView.writeShort(stateTables.size());
+                       Preconditions.checkState(stateTables.size() <= 
Short.MAX_VALUE,
+                                       "Too many KV-States: " + 
stateTables.size() +
+                                                       ". Currently at most " 
+ Short.MAX_VALUE + " states are supported");
 
-               Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
+                       outView.writeShort(stateTables.size());
 
-               for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
+                       Map<String, Integer> kVStateToId = new 
HashMap<>(stateTables.size());
 
-                       outView.writeUTF(kvState.getKey());
+                       for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
 
-                       TypeSerializer namespaceSerializer = 
kvState.getValue().getNamespaceSerializer();
-                       TypeSerializer stateSerializer = 
kvState.getValue().getStateSerializer();
+                               outView.writeUTF(kvState.getKey());
 
-                       InstantiationUtil.serializeObject(stream, 
namespaceSerializer);
-                       InstantiationUtil.serializeObject(stream, 
stateSerializer);
+                               TypeSerializer namespaceSerializer = 
kvState.getValue().getNamespaceSerializer();
+                               TypeSerializer stateSerializer = 
kvState.getValue().getStateSerializer();
 
-                       kVStateToId.put(kvState.getKey(), kVStateToId.size());
-               }
+                               InstantiationUtil.serializeObject(stream, 
namespaceSerializer);
+                               InstantiationUtil.serializeObject(stream, 
stateSerializer);
 
-               int offsetCounter = 0;
-               long[] keyGroupRangeOffsets = new 
long[keyGroupRange.getNumberOfKeyGroups()];
+                               kVStateToId.put(kvState.getKey(), 
kVStateToId.size());
+                       }
 
-               for (int keyGroupIndex = keyGroupRange.getStartKeyGroup(); 
keyGroupIndex <= keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
-                       keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
-                       outView.writeInt(keyGroupIndex);
+                       int offsetCounter = 0;
+                       long[] keyGroupRangeOffsets = new 
long[keyGroupRange.getNumberOfKeyGroups()];
 
-                       for (Map.Entry<String, StateTable<K, ?, ?>> kvState : 
stateTables.entrySet()) {
+                       for (int keyGroupIndex = 
keyGroupRange.getStartKeyGroup(); keyGroupIndex <= 
keyGroupRange.getEndKeyGroup(); keyGroupIndex++) {
+                               keyGroupRangeOffsets[offsetCounter++] = 
stream.getPos();
+                               outView.writeInt(keyGroupIndex);
 
-                               
outView.writeShort(kVStateToId.get(kvState.getKey()));
+                               for (Map.Entry<String, StateTable<K, ?, ?>> 
kvState : stateTables.entrySet()) {
 
-                               TypeSerializer namespaceSerializer = 
kvState.getValue().getNamespaceSerializer();
-                               TypeSerializer stateSerializer = 
kvState.getValue().getStateSerializer();
+                                       
outView.writeShort(kVStateToId.get(kvState.getKey()));
 
-                               // Map<NamespaceT, Map<KeyT, StateT>>
-                               Map<?, ? extends Map<K, ?>> namespaceMap = 
kvState.getValue().get(keyGroupIndex);
-                               if (namespaceMap == null) {
-                                       outView.writeByte(0);
-                                       continue;
-                               }
+                                       TypeSerializer namespaceSerializer = 
kvState.getValue().getNamespaceSerializer();
+                                       TypeSerializer stateSerializer = 
kvState.getValue().getStateSerializer();
+
+                                       // Map<NamespaceT, Map<KeyT, StateT>>
+                                       Map<?, ? extends Map<K, ?>> 
namespaceMap = kvState.getValue().get(keyGroupIndex);
+                                       if (namespaceMap == null) {
+                                               outView.writeByte(0);
+                                               continue;
+                                       }
 
-                               outView.writeByte(1);
+                                       outView.writeByte(1);
 
-                               // number of namespaces
-                               outView.writeInt(namespaceMap.size());
-                               for (Map.Entry<?, ? extends Map<K, ?>> 
namespace : namespaceMap.entrySet()) {
-                                       
namespaceSerializer.serialize(namespace.getKey(), outView);
+                                       // number of namespaces
+                                       outView.writeInt(namespaceMap.size());
+                                       for (Map.Entry<?, ? extends Map<K, ?>> 
namespace : namespaceMap.entrySet()) {
+                                               
namespaceSerializer.serialize(namespace.getKey(), outView);
 
-                                       Map<K, ?> entryMap = 
namespace.getValue();
+                                               Map<K, ?> entryMap = 
namespace.getValue();
 
-                                       // number of entries
-                                       outView.writeInt(entryMap.size());
-                                       for (Map.Entry<K, ?> entry : 
entryMap.entrySet()) {
-                                               
keySerializer.serialize(entry.getKey(), outView);
-                                               
stateSerializer.serialize(entry.getValue(), outView);
+                                               // number of entries
+                                               
outView.writeInt(entryMap.size());
+                                               for (Map.Entry<K, ?> entry : 
entryMap.entrySet()) {
+                                                       
keySerializer.serialize(entry.getKey(), outView);
+                                                       
stateSerializer.serialize(entry.getValue(), outView);
+                                               }
                                        }
                                }
+                               outView.flush();
                        }
-                       outView.flush();
-               }
-
-               StreamStateHandle streamStateHandle = 
stream.closeAndGetHandle();
 
-               KeyGroupRangeOffsets offsets = new 
KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
-               final KeyGroupsStateHandle keyGroupsStateHandle = new 
KeyGroupsStateHandle(offsets, streamStateHandle);
+                       StreamStateHandle streamStateHandle = 
stream.closeAndGetHandle();
 
-               return new DoneFuture(keyGroupsStateHandle);
+                       KeyGroupRangeOffsets offsets = new 
KeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);
+                       final KeyGroupsStateHandle keyGroupsStateHandle = new 
KeyGroupsStateHandle(offsets, streamStateHandle);
+                       return new DoneFuture<>(keyGroupsStateHandle);
+               }
        }
 
        @SuppressWarnings({"unchecked", "rawtypes"})
@@ -251,71 +251,81 @@ public class HeapKeyedStateBackend<K> extends 
KeyedStateBackend<K> {
 
                for (KeyGroupsStateHandle keyGroupsHandle : state) {
 
-                       if(keyGroupsHandle == null) {
+                       if (keyGroupsHandle == null) {
                                continue;
                        }
 
-                       FSDataInputStream fsDataInputStream = 
keyGroupsHandle.getStateHandle().openInputStream();
-                       DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
+                       FSDataInputStream fsDataInputStream = null;
 
-                       int numKvStates = inView.readShort();
+                       try {
 
-                       Map<Integer, String> kvStatesById = new 
HashMap<>(numKvStates);
+                               fsDataInputStream = 
keyGroupsHandle.getStateHandle().openInputStream();
+                               
cancelStreamRegistry.registerClosable(fsDataInputStream);
 
-                       for (int i = 0; i < numKvStates; ++i) {
-                               String stateName = inView.readUTF();
+                               DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
 
-                               TypeSerializer namespaceSerializer =
-                                               
InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
-                               TypeSerializer stateSerializer =
-                                               
InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
+                               int numKvStates = inView.readShort();
 
-                               StateTable<K, ?, ?> stateTable = new StateTable(
-                                               stateSerializer,
-                                               namespaceSerializer,
-                                               keyGroupRange);
-                               stateTables.put(stateName, stateTable);
-                               kvStatesById.put(i, stateName);
-                       }
+                               Map<Integer, String> kvStatesById = new 
HashMap<>(numKvStates);
+
+                               for (int i = 0; i < numKvStates; ++i) {
+                                       String stateName = inView.readUTF();
 
-                       for (int keyGroupIndex = 
keyGroupRange.getStartKeyGroup();  keyGroupIndex <= 
keyGroupRange.getEndKeyGroup(); ++keyGroupIndex) {
-                               long offset = 
keyGroupsHandle.getOffsetForKeyGroup(keyGroupIndex);
-                               fsDataInputStream.seek(offset);
+                                       TypeSerializer namespaceSerializer =
+                                                       
InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
+                                       TypeSerializer stateSerializer =
+                                                       
InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
 
-                               int writtenKeyGroupIndex = inView.readInt();
-                               assert writtenKeyGroupIndex == keyGroupIndex;
+                                       StateTable<K, ?, ?> stateTable = new 
StateTable(stateSerializer,
+                                                       namespaceSerializer,
+                                                       keyGroupRange);
+                                       stateTables.put(stateName, stateTable);
+                                       kvStatesById.put(i, stateName);
+                               }
 
-                               for (int i = 0; i < numKvStates; i++) {
-                                       int kvStateId = inView.readShort();
+                               for (Tuple2<Integer, Long> groupOffset : 
keyGroupsHandle.getGroupRangeOffsets()) {
+                                       int keyGroupIndex = groupOffset.f0;
+                                       long offset = groupOffset.f1;
+                                       fsDataInputStream.seek(offset);
 
-                                       byte isPresent = inView.readByte();
-                                       if (isPresent == 0) {
-                                               continue;
-                                       }
+                                       int writtenKeyGroupIndex = 
inView.readInt();
+                                       assert writtenKeyGroupIndex == 
keyGroupIndex;
+
+                                       for (int i = 0; i < numKvStates; i++) {
+                                               int kvStateId = 
inView.readShort();
+
+                                               byte isPresent = 
inView.readByte();
+                                               if (isPresent == 0) {
+                                                       continue;
+                                               }
 
-                                       StateTable<K, ?, ?> stateTable = 
stateTables.get(kvStatesById.get(kvStateId));
-                                       Preconditions.checkNotNull(stateTable);
+                                               StateTable<K, ?, ?> stateTable 
= stateTables.get(kvStatesById.get(kvStateId));
+                                               
Preconditions.checkNotNull(stateTable);
 
-                                       TypeSerializer namespaceSerializer = 
stateTable.getNamespaceSerializer();
-                                       TypeSerializer stateSerializer = 
stateTable.getStateSerializer();
+                                               TypeSerializer 
namespaceSerializer = stateTable.getNamespaceSerializer();
+                                               TypeSerializer stateSerializer 
= stateTable.getStateSerializer();
 
-                                       Map namespaceMap = new HashMap<>();
-                                       stateTable.set(keyGroupIndex, 
namespaceMap);
+                                               Map namespaceMap = new 
HashMap<>();
+                                               stateTable.set(keyGroupIndex, 
namespaceMap);
 
-                                       int numNamespaces = inView.readInt();
-                                       for (int k = 0; k < numNamespaces; k++) 
{
-                                               Object namespace = 
namespaceSerializer.deserialize(inView);
-                                               Map entryMap = new HashMap<>();
-                                               namespaceMap.put(namespace, 
entryMap);
+                                               int numNamespaces = 
inView.readInt();
+                                               for (int k = 0; k < 
numNamespaces; k++) {
+                                                       Object namespace = 
namespaceSerializer.deserialize(inView);
+                                                       Map entryMap = new 
HashMap<>();
+                                                       
namespaceMap.put(namespace, entryMap);
 
-                                               int numEntries = 
inView.readInt();
-                                               for (int l = 0; l < numEntries; 
l++) {
-                                                       Object key = 
keySerializer.deserialize(inView);
-                                                       Object value = 
stateSerializer.deserialize(inView);
-                                                       entryMap.put(key, 
value);
+                                                       int numEntries = 
inView.readInt();
+                                                       for (int l = 0; l < 
numEntries; l++) {
+                                                               Object key = 
keySerializer.deserialize(inView);
+                                                               Object value = 
stateSerializer.deserialize(inView);
+                                                               
entryMap.put(key, value);
+                                                       }
                                                }
                                        }
                                }
+                       } finally {
+                               
cancelStreamRegistry.unregisterClosable(fsDataInputStream);
+                               IOUtils.closeQuietly(fsDataInputStream);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index b9ff255..7d8b6ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -19,10 +19,8 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.InstantiationUtil;
-
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -32,7 +30,7 @@ import java.util.Arrays;
 /**
  * A state handle that contains stream state in a byte array.
  */
-public class ByteStreamStateHandle extends AbstractCloseableHandle implements 
StreamStateHandle {
+public class ByteStreamStateHandle implements StreamStateHandle {
 
        private static final long serialVersionUID = -5280226231200217594L;
 
@@ -52,9 +50,8 @@ public class ByteStreamStateHandle extends 
AbstractCloseableHandle implements St
 
        @Override
        public FSDataInputStream openInputStream() throws IOException {
-               ensureNotClosed();
 
-               FSDataInputStream inputStream = new FSDataInputStream() {
+               return new FSDataInputStream() {
                        int index = 0;
 
                        @Override
@@ -73,8 +70,6 @@ public class ByteStreamStateHandle extends 
AbstractCloseableHandle implements St
                                return index < data.length ? data[index++] & 
0xFF : -1;
                        }
                };
-               registerCloseable(inputStream);
-               return inputStream;
        }
 
        public byte[] getData() {
@@ -106,9 +101,7 @@ public class ByteStreamStateHandle extends 
AbstractCloseableHandle implements St
 
        @Override
        public int hashCode() {
-               int result = super.hashCode();
-               result = 31 * result + Arrays.hashCode(data);
-               return result;
+               return Arrays.hashCode(data);
        }
 
        public static StreamStateHandle fromSerializable(Serializable value) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index cc145ff..1772dbe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
 import java.io.IOException;
@@ -71,12 +71,13 @@ public class MemoryStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
-       public CheckpointStreamFactory createStreamFactory(JobID jobId, String 
operatorIdentifier) throws IOException {
+       public CheckpointStreamFactory createStreamFactory(
+                       JobID jobId, String operatorIdentifier) throws 
IOException {
                return new MemCheckpointStreamFactory(maxStateSize);
        }
 
        @Override
-       public <K> KeyedStateBackend<K> createKeyedStateBackend(
+       public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                        Environment env, JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,
@@ -93,7 +94,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
        }
 
        @Override
-       public <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+       public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
                        Environment env, JobID jobID,
                        String operatorIdentifier,
                        TypeSerializer<K> keySerializer,

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index c317bed..8bf1127 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -23,13 +23,9 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.util.Preconditions;
 
-import java.util.List;
-
 /**
  * Implementation using {@link ActorGateway} to forward the messages.
  */
@@ -46,8 +42,7 @@ public class ActorGatewayCheckpointResponder implements 
CheckpointResponder {
                        JobID jobID,
                        ExecutionAttemptID executionAttemptID,
                        long checkpointID,
-                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle,
-                       List<KeyGroupsStateHandle> keyGroupStateHandles,
+                       CheckpointStateHandles checkpointStateHandles,
                        long synchronousDurationMillis,
                        long asynchronousDurationMillis,
                        long bytesBufferedInAlignment,
@@ -55,7 +50,7 @@ public class ActorGatewayCheckpointResponder implements 
CheckpointResponder {
 
                AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
                                jobID, executionAttemptID, checkpointID,
-                               chainedStateHandle, keyGroupStateHandles,
+                               checkpointStateHandles,
                                synchronousDurationMillis, 
asynchronousDurationMillis,
                                bytesBufferedInAlignment, 
alignmentDurationNanos);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index b3f9827..698a7f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -20,11 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import java.util.List;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 
 /**
  * Responder for checkpoint acknowledge and decline messages in the {@link 
Task}.
@@ -40,10 +36,8 @@ public interface CheckpointResponder {
         *             Execution attempt ID of the running task
         * @param checkpointID
         *             Checkpoint ID of the checkpoint
-        * @param chainedStateHandle
-        *             Chained state handle
-        * @param keyGroupStateHandles
-        *             State handles for key groups
+        * @param checkpointStateHandles 
+        *             State handles for the checkpoint
         * @param synchronousDurationMillis
         *             The duration (in milliseconds) of the synchronous part 
of the operator checkpoint
         * @param asynchronousDurationMillis
@@ -57,8 +51,7 @@ public interface CheckpointResponder {
                JobID jobID,
                ExecutionAttemptID executionAttemptID,
                long checkpointID,
-               ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-               List<KeyGroupsStateHandle> keyGroupStateHandles,
+               CheckpointStateHandles checkpointStateHandles,
                long synchronousDurationMillis,
                long asynchronousDurationMillis,
                long bytesBufferedInAlignment,

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 23b6f82..c2ba7ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -35,11 +35,8 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -246,7 +243,7 @@ public class RuntimeEnvironment implements Environment {
                        long bytesBufferedInAlignment,
                        long alignmentDurationNanos) {
 
-               acknowledgeCheckpoint(checkpointId, null, null,
+               acknowledgeCheckpoint(checkpointId, null,
                                synchronousDurationMillis, 
asynchronousDurationMillis,
                                bytesBufferedInAlignment, 
alignmentDurationNanos);
        }
@@ -254,8 +251,7 @@ public class RuntimeEnvironment implements Environment {
        @Override
        public void acknowledgeCheckpoint(
                        long checkpointId,
-                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle,
-                       List<KeyGroupsStateHandle> keyGroupStateHandles,
+                       CheckpointStateHandles checkpointStateHandles,
                        long synchronousDurationMillis,
                        long asynchronousDurationMillis,
                        long bytesBufferedInAlignment,
@@ -264,7 +260,7 @@ public class RuntimeEnvironment implements Environment {
 
                checkpointResponder.acknowledgeCheckpoint(
                                jobId, executionId, checkpointId,
-                               chainedStateHandle, keyGroupStateHandles,
+                               checkpointStateHandles,
                                synchronousDurationMillis, 
asynchronousDurationMillis,
                                bytesBufferedInAlignment, 
alignmentDurationNanos);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 62dc8b7..8463fa0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -59,6 +59,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import org.apache.flink.util.Preconditions;
@@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -241,6 +243,8 @@ public class Task implements Runnable, TaskActions {
         */
        private volatile List<KeyGroupsStateHandle> keyGroupStates;
 
+       private volatile List<Collection<OperatorStateHandle>> 
partitionableOperatorState;
+
        /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
        private long taskCancellationInterval;
 
@@ -278,6 +282,7 @@ public class Task implements Runnable, TaskActions {
                this.chainedOperatorState = tdd.getOperatorState();
                this.serializedExecutionConfig = 
checkNotNull(tdd.getSerializedExecutionConfig());
                this.keyGroupStates = tdd.getKeyGroupState();
+               this.partitionableOperatorState = 
tdd.getPartitionableOperatorState();
 
                this.taskCancellationInterval = jobConfiguration.getLong(
                        ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
@@ -488,7 +493,7 @@ public class Task implements Runnable, TaskActions {
                Map<String, Future<Path>> distributedCacheEntries = new 
HashMap<String, Future<Path>>();
                AbstractInvokable invokable = null;
 
-               ClassLoader userCodeClassLoader = null;
+               ClassLoader userCodeClassLoader;
                try {
                        // ----------------------------
                        //  Task Bootstrap - We periodically
@@ -564,10 +569,10 @@ public class Task implements Runnable, TaskActions {
                        // the state into the task. the state is non-empty if 
this is an execution
                        // of a task that failed but had backuped state from a 
checkpoint
 
-                       if (chainedOperatorState != null || keyGroupStates != 
null) {
+                       if (chainedOperatorState != null || keyGroupStates != 
null || partitionableOperatorState != null) {
                                if (invokable instanceof StatefulTask) {
                                        StatefulTask op = (StatefulTask) 
invokable;
-                                       
op.setInitialState(chainedOperatorState, keyGroupStates);
+                                       
op.setInitialState(chainedOperatorState, keyGroupStates, 
partitionableOperatorState);
                                } else {
                                        throw new IllegalStateException("Found 
operator state for a non-stateful task invokable");
                                }

Reply via email to