http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index a1e9d7d..a419d1e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -31,7 +31,9 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 /**
@@ -187,8 +189,58 @@ public interface RuntimeContext {
        // 
--------------------------------------------------------------------------------------------
 
        /**
+        * Gets the partitioned state, which is only accessible if the function 
is executed on
+        * a KeyedStream. When interacting with the state only the instance 
bound to the key of the
+        * element currently processed by the function is changed.
+        * Each operator may maintain multiple partitioned states, addressed 
with different names.
+        *
+        * <p>Because the scope of each value is the key of the currently 
processed element,
+        * and the elements are distributed by the Flink runtime, the system 
can transparently
+        * scale out and redistribute the state and KeyedStream.
+        *
+        * <p>The following code example shows how to implement a continuous 
counter that counts
+        * how many times elements of a certain key occur, and emits an updated 
count for that
+        * element on each occurrence.
+        *
+        * <pre>{@code
+        * DataStream<MyType> stream = ...;
+        * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+        *
+        * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
+        *
+        *     private ValueStateDescriptor<Long> countIdentifier =
+        *         new ValueStateDescriptor<>("count", 0L, 
LongSerializer.INSTANCE);
+        *
+        *     private ValueState<Long> count;
+        *
+        *     public void open(Configuration cfg) {
+        *         state = 
getRuntimeContext().getPartitionedState(countIdentifier);
+        *     }
+        *
+        *     public Tuple2<MyType, Long> map(MyType value) {
+        *         long count = state.value();
+        *         state.update(value + 1);
+        *         return new Tuple2<>(value, count);
+        *     }
+        * });
+        *
+        * }</pre>
+        *
+        * @param stateDescriptor The StateDescriptor that contains the name 
and type of the
+        *                        state that is being accessed.
+        *
+        * @param <S> The type of the state.
+        *
+        * @return The partitioned state object.
+        *
+        * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
+        *                                       function (function is not part 
os a KeyedStream).
+        */
+       <S extends State> S getPartitionedState(StateDescriptor<S> 
stateDescriptor);
+
+       /**
         * Gets the key/value state, which is only accessible if the function 
is executed on
-        * a KeyedStream. Upon calling {@link OperatorState#value()}, the 
key/value state will
+        * a KeyedStream. Upon calling {@link ValueState#value()}, the 
key/value state will
         * return the value bound to the key of the element currently processed 
by the function.
         * Each operator may maintain multiple key/value states, addressed with 
different names.
         *
@@ -226,11 +278,13 @@ public interface RuntimeContext {
         * the TypeInformation object must be manually passed via 
         * {@link #getKeyValueState(String, TypeInformation, Object)}. 
         * 
+        *
         * @param name The name of the key/value state.
         * @param stateType The class of the type that is stored in the state. 
Used to generate
         *                  serializers for managed memory and checkpointing.
         * @param defaultState The default state value, returned when the state 
is accessed and
         *                     no value has yet been set for the key. May be 
null.
+        *
         * @param <S> The type of the state.
         *
         * @return The key/value state access.
@@ -238,11 +292,12 @@ public interface RuntimeContext {
         * @throws UnsupportedOperationException Thrown, if no key/value state 
is available for the
         *                                       function (function is not part 
os a KeyedStream).
         */
-       <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, 
S defaultState);
+       @Deprecated
+       <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S 
defaultState);
 
        /**
         * Gets the key/value state, which is only accessible if the function 
is executed on
-        * a KeyedStream. Upon calling {@link OperatorState#value()}, the 
key/value state will
+        * a KeyedStream. Upon calling {@link ValueState#value()}, the 
key/value state will
         * return the value bound to the key of the element currently processed 
by the function.
         * Each operator may maintain multiple key/value states, addressed with 
different names.
         * 
@@ -275,17 +330,19 @@ public interface RuntimeContext {
         *     
         * }</pre>
         * 
+        *
         * @param name The name of the key/value state.
         * @param stateType The type information for the type that is stored in 
the state.
-        *                  Used to create serializers for managed memory and 
checkpoints.   
+        *                  Used to create serializers for managed memory and 
checkpoints.
         * @param defaultState The default state value, returned when the state 
is accessed and
         *                     no value has yet been set for the key. May be 
null.
         * @param <S> The type of the state.
-        *    
+        *
         * @return The key/value state access.
         * 
         * @throws UnsupportedOperationException Thrown, if no key/value state 
is available for the
         *                                       function (function is not part 
os a KeyedStream).
         */
-       <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> 
stateType, S defaultState);
+       @Deprecated
+       <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> 
stateType, S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 8f1b6b1..fe18994 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -34,7 +34,9 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 
@@ -169,13 +171,22 @@ public abstract class AbstractRuntimeUDFContext 
implements RuntimeContext {
        }
 
        @Override
-       public <S> OperatorState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
+       public <S extends State> S getPartitionedState(StateDescriptor<S> 
stateDescriptor) {
+               throw new UnsupportedOperationException(
+                               "This state is only accessible by functions 
executed on a KeyedStream");
+
+       }
+
+       @Override
+       @Deprecated
+       public <S> ValueState<S> getKeyValueState(String name, Class<S> 
stateType, S defaultState) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }
 
        @Override
-       public <S> OperatorState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
+       @Deprecated
+       public <S> ValueState<S> getKeyValueState(String name, 
TypeInformation<S> stateType, S defaultState) {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
new file mode 100644
index 0000000..f803105
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+/**
+ * {@link State} interface for partitioned list state in Operations.
+ * The state is accessed and modified by user functions, and checkpointed 
consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
+ * automatically supplied by the system, so the function always sees the value 
mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ * 
+ * @param <T> Type of values that this list state keeps.
+ */
+public interface ListState<T> extends MergingState<T, Iterable<T>> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
new file mode 100644
index 0000000..e391126
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link StateDescriptor} for {@link ListState}. This can be used to create a 
partitioned
+ * list state using
+ * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ *
+ * @param <T> The type of the values that can be added to the list state.
+ */
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>> {
+       private static final long serialVersionUID = 1L;
+
+       private final TypeSerializer<T> serializer;
+
+       /**
+        * Creates a new {@code ListStateDescriptor} with the given name.
+        *
+        * @param name The (unique) name for the state.
+        * @param serializer {@link TypeSerializer} for the state values.
+        */
+       public ListStateDescriptor(String name, TypeSerializer<T> serializer) {
+               super(requireNonNull(name));
+               this.serializer = requireNonNull(serializer);
+       }
+
+       @Override
+       public ListState<T> bind(StateBackend stateBackend) throws Exception {
+               return stateBackend.createListState(this);
+       }
+
+       /**
+        * Returns the {@link TypeSerializer} that can be used to serialize the 
value in the state.
+        */
+       public TypeSerializer<T> getSerializer() {
+               return serializer;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
+
+               return serializer.equals(that.serializer) && 
name.equals(that.name);
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = serializer.hashCode();
+               result = 31 * result + name.hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "ListStateDescriptor{" +
+                       "serializer=" + serializer +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
new file mode 100644
index 0000000..f6c0ecb
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import java.io.IOException;
+
+/**
+ * Base interface for partitioned state that supports adding elements and 
inspecting the current
+ * state of merged elements. Elements can either be kept in a buffer 
(list-like) or merged together
+ * into one value.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed 
consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
+ * automatically supplied by the system, so the function always sees the value 
mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ * 
+ * @param <IN> Type of the value that can be added to the state.
+ * @param <OUT> Type of the value that can be retrieved from the state.
+ */
+public interface MergingState<IN, OUT> extends State {
+
+       /**
+        * Returns the current value for the state. When the state is not
+        * partitioned the returned value is the same for all inputs in a given
+        * operator instance. If state partitioning is applied, the value 
returned
+        * depends on the current operator input, as the operator maintains an
+        * independent state for each partition.
+        * 
+        * @return The operator state value corresponding to the current input.
+        * 
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       OUT get() throws Exception ;
+
+       /**
+        * Updates the operator state accessible by {@link #get()} by adding 
the given value
+        * to the list of values. The next time {@link #get()} is called (for 
the same state
+        * partition) the returned state will represent the updated list.
+        * 
+        * @param value
+        *            The new value for the state.
+        *            
+        * @throws IOException Thrown if the system cannot access the state.
+        */
+       void add(IN value) throws Exception;
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index ec30f82..32ffa7f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -35,6 +35,7 @@ import java.io.IOException;
  * @param <T> Type of the value in the operator state
  */
 @Public
+@Deprecated
 public interface OperatorState<T> {
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
new file mode 100644
index 0000000..3e2c543
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+/**
+ * {@link State} interface for reducing state. Elements can be added to the 
state, they will
+ * be combined using a reduce function. The current state can be inspected.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed 
consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
+ * automatically supplied by the system, so the function always sees the value 
mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ * 
+ * @param <T> Type of the value in the operator state
+ */
+public interface ReducingState<T> extends MergingState<T, T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
new file mode 100644
index 0000000..7153a05
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link StateDescriptor} for {@link ReducingState}. This can be used to 
create partitioned
+ * reducing state using
+ * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ *
+ * @param <T> The type of the values that can be added to the list state.
+ */
+public class ReducingStateDescriptor<T> extends 
StateDescriptor<ReducingState<T>> {
+       private static final long serialVersionUID = 1L;
+
+       private final TypeSerializer<T> serializer;
+
+       private final ReduceFunction<T> reduceFunction;
+
+       /**
+        * Creates a new {@code ReducingStateDescriptor} with the given name 
and reduce function.
+        *
+        * @param name The (unique) name for the state.
+        * @param serializer {@link TypeSerializer} for the state values.
+        */
+       public ReducingStateDescriptor(String name,
+                       ReduceFunction<T> reduceFunction,
+                       TypeSerializer<T> serializer) {
+               super(requireNonNull(name));
+               if (reduceFunction instanceof RichFunction) {
+                       throw new UnsupportedOperationException("ReduceFunction 
of ReducingState can not be a RichFunction.");
+               }
+               this.serializer = requireNonNull(serializer);
+               this.reduceFunction = reduceFunction;
+       }
+
+       @Override
+       public ReducingState<T> bind(StateBackend stateBackend) throws 
Exception {
+               return stateBackend.createReducingState(this);
+       }
+
+       /**
+        * Returns the {@link TypeSerializer} that can be used to serialize the 
value in the state.
+        */
+       public TypeSerializer<T> getSerializer() {
+               return serializer;
+       }
+
+       /**
+        * Returns the reduce function to be used for the reducing state.
+        */
+       public ReduceFunction<T> getReduceFunction() {
+               return reduceFunction;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) 
o;
+
+               return serializer.equals(that.serializer) && 
name.equals(that.name);
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = serializer.hashCode();
+               result = 31 * result + name.hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "ReducingStateDescriptor{" +
+                       "serializer=" + serializer +
+                       ", reduceFunction=" + reduceFunction +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
new file mode 100644
index 0000000..255a735
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.state;
+
+/**
+ * Interface that different types of partitioned state must implement.
+ *
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
+ * automatically supplied by the system, so the function always sees the value 
mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ */
+public interface State {
+       void clear();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
new file mode 100644
index 0000000..d5adf9b
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.state;
+
+/**
+ * The {@code StateBackend} is used by {@link StateDescriptor} instances to 
create actual state
+ * representations.
+ */
+public interface StateBackend {
+
+       /**
+        * Creates and returns a new {@link ValueState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> The type of the value that the {@code ValueState} can 
store.
+        */
+       <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) 
throws Exception;
+
+       /**
+        * Creates and returns a new {@link ListState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> The type of the values that the {@code ListState} can 
store.
+        */
+       <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) 
throws Exception;
+
+       /**
+        * Creates and returns a new {@link ReducingState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <T> The type of the values that the {@code ListState} can 
store.
+        */
+       <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> 
stateDesc) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
new file mode 100644
index 0000000..f62118d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.state;
+
+import java.io.Serializable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for state descriptors. A {@code StateDescriptor} is used for 
creating partitioned
+ * {@link State} in stateful operations. This contains the name and can create 
an actual state
+ * object given a {@link StateBackend} using {@link #bind(StateBackend)}.
+ *
+ * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link 
#hashCode()}.
+ *
+ * @param <S> The type of the State objects created from this {@code 
StateDescriptor}.
+ */
+public abstract class StateDescriptor<S extends State> implements Serializable 
{
+       private static final long serialVersionUID = 1L;
+
+       /** Name that uniquely identifies state created from this 
StateDescriptor. */
+       protected final String name;
+
+       /**
+        * Create a new {@code StateDescriptor} with the given name.
+        * @param name The name of the {@code StateDescriptor}.
+        */
+       public StateDescriptor(String name) {
+               this.name = requireNonNull(name);;
+       }
+
+       /**
+        * Returns the name of this {@code StateDescriptor}.
+        */
+       public String getName() {
+               return name;
+       }
+
+       /**
+        * Creates a new {@link State} on the given {@link StateBackend}.
+        *
+        * @param stateBackend The {@code StateBackend} on which to create the 
{@link State}.
+        */
+       public abstract S bind(StateBackend stateBackend) throws Exception ;
+
+       // Force subclasses to implement
+       public abstract boolean equals(Object o);
+
+       // Force subclasses to implement
+       public abstract int hashCode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
new file mode 100644
index 0000000..ddb048f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.IOException;
+
+/**
+ * {@link State} interface for partitioned single-value state. The value can 
be retrieved or
+ * updated.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed 
consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
+ * automatically supplied by the system, so the function always sees the value 
mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ * 
+ * @param <T> Type of the value in the state.
+ */
+@Public
+public interface ValueState<T> extends State, OperatorState<T> {
+
+       /**
+        * Returns the current value for the state. When the state is not
+        * partitioned the returned value is the same for all inputs in a given
+        * operator instance. If state partitioning is applied, the value 
returned
+        * depends on the current operator input, as the operator maintains an
+        * independent state for each partition.
+        * 
+        * @return The operator state value corresponding to the current input.
+        * 
+        * @throws IOException Thrown if the system cannot access the state.
+        */
+       T value() throws IOException;
+
+       /**
+        * Updates the operator state accessible by {@link #value()} to the 
given
+        * value. The next time {@link #value()} is called (for the same state
+        * partition) the returned state will represent the updated value. When 
a
+        * partitioned state is updated with null, the state for the current 
key 
+        * will be removed and the default value is returned on the next access.
+        * 
+        * @param value
+        *            The new value for the state.
+        *            
+        * @throws IOException Thrown if the system cannot access the state.
+        */
+       void update(T value) throws IOException;
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
new file mode 100644
index 0000000..bcfa46f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link StateDescriptor} for {@link ValueState}. This can be used to create 
partitioned
+ * value state using
+ * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getPartitionedState(StateDescriptor)}.
+ *
+ * @param <T> The type of the values that the value state can hold.
+ */
+public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>> {
+       private static final long serialVersionUID = 1L;
+
+       private transient T defaultValue;
+
+       private final TypeSerializer<T> serializer;
+
+       /**
+        * Creates a new {@code ValueStateDescriptor} with the given name and 
default value.
+        *
+        * @param name The (unique) name for the state.
+        * @param defaultValue The default value that will be set when 
requesting state without setting
+        *                     a value before.
+        * @param serializer {@link TypeSerializer} for the state values.
+        */
+       public ValueStateDescriptor(String name, T defaultValue, 
TypeSerializer<T> serializer) {
+               super(requireNonNull(name));
+               this.defaultValue = defaultValue;
+               this.serializer = requireNonNull(serializer);
+       }
+
+       private void writeObject(final ObjectOutputStream out) throws 
IOException {
+               out.defaultWriteObject();
+
+               if (defaultValue == null) {
+                       // we don't have a default value
+                       out.writeBoolean(false);
+               } else {
+                       out.writeBoolean(true);
+
+                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                       DataOutputViewStreamWrapper outView =
+                                       new DataOutputViewStreamWrapper(new 
DataOutputStream(baos));
+
+                       try {
+                               serializer.serialize(defaultValue, outView);
+                       } catch (IOException ioe) {
+                               throw new RuntimeException("Unable to serialize 
default value of type " +
+                                               
defaultValue.getClass().getSimpleName() + ".", ioe);
+                       }
+
+                       outView.close();
+
+                       out.writeInt(baos.size());
+                       out.write(baos.toByteArray());
+               }
+
+       }
+
+       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               boolean hasDefaultValue = in.readBoolean();
+
+               if (hasDefaultValue) {
+                       int size = in.readInt();
+                       byte[] buffer = new byte[size];
+                       int bytesRead = in.read(buffer);
+
+                       if (bytesRead != size) {
+                               throw new RuntimeException("Read size does not 
match expected size.");
+                       }
+
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer);
+                       DataInputViewStreamWrapper inView =
+                                       new DataInputViewStreamWrapper(new 
DataInputStream(bais));
+                       defaultValue = serializer.deserialize(inView);
+               } else {
+                       defaultValue = null;
+               }
+       }
+
+       @Override
+       public ValueState<T> bind(StateBackend stateBackend) throws Exception {
+               return stateBackend.createValueState(this);
+       }
+
+       /**
+        * Returns the default value.
+        */
+       public T getDefaultValue() {
+               if (defaultValue != null) {
+                       return serializer.copy(defaultValue);
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Returns the {@link TypeSerializer} that can be used to serialize the 
value in the state.
+        */
+       public TypeSerializer<T> getSerializer() {
+               return serializer;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
+
+               return serializer.equals(that.serializer) && 
name.equals(that.name);
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = serializer.hashCode();
+               result = 31 * result + name.hashCode();
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "ValueStateDescriptor{" +
+                               "name=" + name +
+                               ", defaultValue=" + defaultValue +
+                               ", serializer=" + serializer +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 68842d6..a9d5cd6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -33,7 +33,7 @@ public abstract class TypeSerializerSingleton<T> extends 
TypeSerializer<T>{
 
        @Override
        public int hashCode() {
-               return TypeSerializerSingleton.class.hashCode();
+               return this.getClass().hashCode();
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index baa4af8..b86830d 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -108,7 +108,7 @@ public class SessionWindowing {
                @Override
                public TriggerResult onElement(Tuple3<String, Long, Integer> 
element, long timestamp, GlobalWindow window, TriggerContext ctx) throws 
Exception {
 
-                       OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       ValueState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
                        Long lastSeen = lastSeenState.value();
 
                        Long timeSinceLastEvent = timestamp - lastSeen;
@@ -127,7 +127,7 @@ public class SessionWindowing {
 
                @Override
                public TriggerResult onEventTime(long time, GlobalWindow 
window, TriggerContext ctx) throws Exception {
-                       OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       ValueState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
                        Long lastSeen = lastSeenState.value();
 
                        if (time - lastSeen >= sessionTimeout) {

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 49dfc21..942a3c9 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -25,11 +25,14 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +58,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class FileStateBackendTest {
+public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> 
{
        
        private static File TEMP_DIR;
        
@@ -99,6 +102,20 @@ public class FileStateBackendTest {
                catch (Exception ignored) {}
        }
 
+       private URI stateBaseURI;
+
+       @Override
+       protected FsStateBackend getStateBackend() throws Exception {
+               stateBaseURI = new URI(HDFS_ROOT_URI + 
UUID.randomUUID().toString());
+               return new FsStateBackend(stateBaseURI);
+
+       }
+
+       @Override
+       protected void cleanup() throws Exception {
+               FileSystem.get(stateBaseURI).delete(new Path(stateBaseURI), 
true);
+       }
+
        // 
------------------------------------------------------------------------
        //  Tests
        // 
------------------------------------------------------------------------
@@ -128,7 +145,7 @@ public class FileStateBackendTest {
                                // supreme!
                        }
 
-                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0));
+                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "dummy", IntSerializer.INSTANCE);
                        assertNotNull(backend.getCheckpointDirectory());
 
                        Path checkpointDir = backend.getCheckpointDirectory();
@@ -149,9 +166,8 @@ public class FileStateBackendTest {
        @Test
        public void testSerializableState() {
                try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
-                               new FsStateBackend(randomHdfsFileUri(), 40));
-                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0));
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri(), 
40));
+                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "dummy", IntSerializer.INSTANCE);
 
                        Path checkpointDir = backend.getCheckpointDirectory();
 
@@ -183,9 +199,8 @@ public class FileStateBackendTest {
        @Test
        public void testStateOutputStream() {
                try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
-                               new FsStateBackend(randomHdfsFileUri(), 15));
-                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0));
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri(), 
15));
+                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "dummy", IntSerializer.INSTANCE);
 
                        Path checkpointDir = backend.getCheckpointDirectory();
 
@@ -219,14 +234,14 @@ public class FileStateBackendTest {
 
                        // use with try-with-resources
                        StreamStateHandle handle4;
-                       try (StateBackend.CheckpointStateOutputStream stream4 =
+                       try (AbstractStateBackend.CheckpointStateOutputStream 
stream4 =
                                                 
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
                                stream4.write(state4);
                                handle4 = stream4.closeAndGetHandle();
                        }
 
                        // close before accessing handle
-                       StateBackend.CheckpointStateOutputStream stream5 =
+                       AbstractStateBackend.CheckpointStateOutputStream 
stream5 =
                                        
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
                        stream5.write(state4);
                        stream5.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
deleted file mode 100644
index 23703b3..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Base class for key/value state implementations that are backed by a regular 
heap hash map. The
- * concrete implementations define how the state is checkpointed.
- * 
- * @param <K> The type of the key.
- * @param <V> The type of the value.
- * @param <Backend> The type of the backend that snapshots this key/value 
state.
- */
-public abstract class AbstractHeapKvState<K, V, Backend extends 
StateBackend<Backend>> implements KvState<K, V, Backend> {
-
-       /** Map containing the actual key/value pairs */
-       private final HashMap<K, V> state;
-       
-       /** The serializer for the keys */
-       private final TypeSerializer<K> keySerializer;
-
-       /** The serializer for the values */
-       private final TypeSerializer<V> valueSerializer;
-       
-       /** The value that is returned when no other value has been associated 
with a key, yet */
-       private final V defaultValue;
-       
-       /** The current key, which the next value methods will refer to */
-       private K currentKey;
-       
-       /**
-        * Creates a new empty key/value state.
-        * 
-        * @param keySerializer The serializer for the keys.
-        * @param valueSerializer The serializer for the values.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
-        */
-       protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
-                                                                       
TypeSerializer<V> valueSerializer,
-                                                                       V 
defaultValue) {
-               this(keySerializer, valueSerializer, defaultValue, new 
HashMap<K, V>());
-       }
-
-       /**
-        * Creates a new key/value state for the given hash map of key/value 
pairs.
-        * 
-        * @param keySerializer The serializer for the keys.
-        * @param valueSerializer The serializer for the values.
-        * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
-        * @param state The state map to use in this kev/value state. May 
contain initial state.   
-        */
-       protected AbstractHeapKvState(TypeSerializer<K> keySerializer,
-                                                                       
TypeSerializer<V> valueSerializer,
-                                                                       V 
defaultValue,
-                                                                       
HashMap<K, V> state) {
-               this.state = requireNonNull(state);
-               this.keySerializer = requireNonNull(keySerializer);
-               this.valueSerializer = requireNonNull(valueSerializer);
-               this.defaultValue = defaultValue;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public V value() {
-               V value = state.get(currentKey);
-               return value != null ? value : 
-                               (defaultValue == null ? null : 
valueSerializer.copy(defaultValue));
-       }
-
-       @Override
-       public void update(V value) {
-               if (value != null) {
-                       state.put(currentKey, value);
-               }
-               else {
-                       state.remove(currentKey);
-               }
-       }
-
-       @Override
-       public void setCurrentKey(K currentKey) {
-               this.currentKey = currentKey;
-       }
-
-       @Override
-       public int size() {
-               return state.size();
-       }
-
-       @Override
-       public void dispose() {
-               state.clear();
-       }
-
-       /**
-        * Gets the serializer for the keys.
-        * @return The serializer for the keys.
-        */
-       public TypeSerializer<K> getKeySerializer() {
-               return keySerializer;
-       }
-
-       /**
-        * Gets the serializer for the values.
-        * @return The serializer for the values.
-        */
-       public TypeSerializer<V> getValueSerializer() {
-               return valueSerializer;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  checkpointing utilities
-       // 
------------------------------------------------------------------------
-       
-       protected void writeStateToOutputView(final DataOutputView out) throws 
IOException {
-               for (Map.Entry<K, V> entry : state.entrySet()) {
-                       keySerializer.serialize(entry.getKey(), out);
-                       valueSerializer.serialize(entry.getValue(), out);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
new file mode 100644
index 0000000..206be64
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapState.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for partitioned {@link ListState} implementations that are 
backed by a regular
+ * heap hash map. The concrete implementations define how the state is 
checkpointed.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <SV> The type of the values in the state.
+ * @param <S> The type of State
+ * @param <SD> The type of StateDescriptor for the State S
+ * @param <Backend> The type of the backend that snapshots this key/value 
state.
+ */
+public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends 
StateDescriptor<S>, Backend extends AbstractStateBackend>
+               implements KvState<K, N, S, SD, Backend>, State {
+
+       /** Map containing the actual key/value pairs */
+       protected final HashMap<N, Map<K, SV>> state;
+
+       /** Serializer for the state value. The state value could be a List<V>, 
for example. */
+       protected final TypeSerializer<SV> stateSerializer;
+
+       /** The serializer for the keys */
+       protected final TypeSerializer<K> keySerializer;
+
+       /** The serializer for the namespace */
+       protected final TypeSerializer<N> namespaceSerializer;
+
+       /** This holds the name of the state and can create an initial default 
value for the state. */
+       protected final SD stateDesc;
+
+       /** The current key, which the next value methods will refer to */
+       protected K currentKey;
+
+       /** The current namespace, which the access methods will refer to. */
+       protected N currentNamespace = null;
+
+       /** Cache the state map for the current key. */
+       protected Map<K, SV> currentNSState;
+
+       /**
+        * Creates a new empty key/value state.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        */
+       protected AbstractHeapState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               TypeSerializer<SV> stateSerializer,
+               SD stateDesc) {
+               this(keySerializer, namespaceSerializer, stateSerializer, 
stateDesc, new HashMap<N, Map<K, SV>>());
+       }
+
+       /**
+        * Creates a new key/value state for the given hash map of key/value 
pairs.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param state The state map to use in this kev/value state. May 
contain initial state.
+        */
+       protected AbstractHeapState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               TypeSerializer<SV> stateSerializer,
+               SD stateDesc,
+               HashMap<N, Map<K, SV>> state) {
+               this.state = requireNonNull(state);
+               this.keySerializer = requireNonNull(keySerializer);
+               this.namespaceSerializer = requireNonNull(namespaceSerializer);
+               this.stateSerializer = stateSerializer;
+               this.stateDesc = stateDesc;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public final void clear() {
+               if (currentNSState != null) {
+                       currentNSState.remove(currentKey);
+                       if (currentNSState.isEmpty()) {
+                               state.remove(currentNamespace);
+                               currentNSState = null;
+                       }
+               }
+       }
+
+       @Override
+       public final void setCurrentKey(K currentKey) {
+               this.currentKey = currentKey;
+       }
+
+       @Override
+       public final void setCurrentNamespace(N namespace) {
+               if (namespace != null && 
namespace.equals(this.currentNamespace)) {
+                       return;
+               }
+               this.currentNamespace = namespace;
+               this.currentNSState = state.get(currentNamespace);
+       }
+
+       /**
+        * Returns the number of all state pairs in this state, across 
namespaces.
+        */
+       protected final int size() {
+               int size = 0;
+               for (Map<K, SV> namespace: state.values()) {
+                       size += namespace.size();
+               }
+               return size;
+       }
+
+       @Override
+       public void dispose() {
+               state.clear();
+       }
+
+       /**
+        * Gets the serializer for the keys.
+        *
+        * @return The serializer for the keys.
+        */
+       public final TypeSerializer<K> getKeySerializer() {
+               return keySerializer;
+       }
+
+       /**
+        * Gets the serializer for the namespace.
+        *
+        * @return The serializer for the namespace.
+        */
+       public final TypeSerializer<N> getNamespaceSerializer() {
+               return namespaceSerializer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
new file mode 100644
index 0000000..958b4dc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A state backend defines how state is stored and snapshotted during 
checkpoints.
+ */
+public abstract class AbstractStateBackend implements java.io.Serializable {
+       
+       private static final long serialVersionUID = 4620413814639220247L;
+
+       protected transient TypeSerializer<?> keySerializer;
+
+       protected transient ClassLoader userCodeClassLoader;
+
+       protected transient Object currentKey;
+
+       /** For efficient access in setCurrentKey() */
+       private transient KvState<?, ?, ?, ?, ?>[] keyValueStates;
+
+       /** So that we can give out state when the user uses the same key. */
+       private transient HashMap<String, KvState<?, ?, ?, ?, ?>> 
keyValueStatesByName;
+
+       /** For caching the last accessed partitioned state */
+       private transient String lastName;
+
+       @SuppressWarnings("rawtypes")
+       private transient KvState lastState;
+
+       // 
------------------------------------------------------------------------
+       //  initialization and cleanup
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This method is called by the task upon deployment to initialize the 
state backend for
+        * data for a specific job.
+        *
+        * @param env The {@link Environment} of the task that instantiated the 
state backend
+        * @param operatorIdentifier Unique identifier for naming states 
created by this backend
+        * @throws Exception Overwritten versions of this method may throw 
exceptions, in which
+        *                   case the job that uses the state backend is 
considered failed during
+        *                   deployment.
+        */
+       public void initializeForJob(Environment env,
+               String operatorIdentifier,
+               TypeSerializer<?> keySerializer) throws Exception {
+               this.userCodeClassLoader = env.getUserClassLoader();
+               this.keySerializer = keySerializer;
+       }
+
+       /**
+        * Disposes all state associated with the current job.
+        *
+        * @throws Exception Exceptions may occur during disposal of the state 
and should be forwarded.
+        */
+       public abstract void disposeAllStateForCurrentJob() throws Exception;
+
+       /**
+        * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
+        * checkpoint data.
+        *
+        * @throws Exception Exceptions can be forwarded and will be logged by 
the system
+        */
+       public abstract void close() throws Exception;
+
+       public void dispose() {
+               if (keyValueStates != null) {
+                       for (KvState<?, ?, ?, ?, ?> state : keyValueStates) {
+                               state.dispose();
+                       }
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  key/value state
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates and returns a new {@link ValueState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> The type of the value that the {@code ValueState} can 
store.
+        */
+       abstract protected <N, T> ValueState<T> 
createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> 
stateDesc) throws Exception;
+
+       /**
+        * Creates and returns a new {@link ListState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> The type of the values that the {@code ListState} can 
store.
+        */
+       abstract protected <N, T> ListState<T> 
createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> 
stateDesc) throws Exception;
+
+       /**
+        * Creates and returns a new {@link ReducingState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <T> The type of the values that the {@code ListState} can 
store.
+        */
+       abstract protected <N, T> ReducingState<T> 
createReducingState(TypeSerializer<N> namespaceSerializer, 
ReducingStateDescriptor<T> stateDesc) throws Exception;
+
+       /**
+        * Sets the current key that is used for partitioned state.
+        * @param currentKey The current key.
+        */
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public void setCurrentKey(Object currentKey) {
+               this.currentKey = currentKey;
+               if (keyValueStates != null) {
+                       for (KvState kv : keyValueStates) {
+                               kv.setCurrentKey(currentKey);
+                       }
+               }
+       }
+
+       public Object getCurrentKey() {
+               return currentKey;
+       }
+
+       /**
+        * Creates or retrieves a partitioned state backed by this state 
backend.
+        *
+        * @param stateDescriptor The state identifier for the state. This 
contains name
+        *                           and can create a default state value.
+        * @param <K> The type of the key.
+        * @param <N> The type of the namespace.
+        * @param <S> The type of the state.
+        *
+        * @return A new key/value state backed by this backend.
+        *
+        * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
+        */
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       public <K, N, S extends State> S getPartitionedState(final N namespace, 
final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S> 
stateDescriptor) throws Exception {
+
+               if (keySerializer == null) {
+                       throw new Exception("State key serializer has not been 
configured in the config. " +
+                                       "This operation cannot use partitioned 
state.");
+               }
+
+               if (keyValueStatesByName == null) {
+                       keyValueStatesByName = new HashMap<>();
+               }
+
+               if (lastName != null && 
lastName.equals(stateDescriptor.getName())) {
+                       lastState.setCurrentNamespace(namespace);
+                       return (S) lastState;
+               }
+
+               KvState<?, ?, ?, ?, ?> previous = 
keyValueStatesByName.get(stateDescriptor.getName());
+               if (previous != null) {
+                       lastState = previous;
+                       lastState.setCurrentNamespace(namespace);
+                       lastName = stateDescriptor.getName();
+                       return (S) previous;
+               }
+
+               // create a new blank key/value state
+               S kvstate = stateDescriptor.bind(new StateBackend() {
+                       @Override
+                       public <T> ValueState<T> 
createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
+                               return 
AbstractStateBackend.this.createValueState(namespaceSerializer, stateDesc);
+                       }
+
+                       @Override
+                       public <T> ListState<T> 
createListState(ListStateDescriptor<T> stateDesc) throws Exception {
+                               return 
AbstractStateBackend.this.createListState(namespaceSerializer, stateDesc);
+                       }
+
+                       @Override
+                       public <T> ReducingState<T> 
createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+                               return 
AbstractStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
+                       }
+               });
+
+               keyValueStatesByName.put(stateDescriptor.getName(), (KvState) 
kvstate);
+               keyValueStates = keyValueStatesByName.values().toArray(new 
KvState[keyValueStatesByName.size()]);
+
+               lastName = stateDescriptor.getName();
+               lastState = (KvState<?, ?, ?, ?, ?>) kvstate;
+
+               ((KvState) kvstate).setCurrentKey(currentKey);
+               ((KvState) kvstate).setCurrentNamespace(namespace);
+
+               return kvstate;
+       }
+
+       public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> 
snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
+               if (keyValueStates != null) {
+                       HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> 
snapshots = new HashMap<>(keyValueStatesByName.size());
+
+                       for (Map.Entry<String, KvState<?, ?, ?, ?, ?>> entry : 
keyValueStatesByName.entrySet()) {
+                               KvStateSnapshot<?, ?, ?, ?, ?> snapshot = 
entry.getValue().snapshot(checkpointId, timestamp);
+                               snapshots.put(entry.getKey(), snapshot);
+                       }
+                       return snapshots;
+               }
+
+               return null;
+       }
+
+       public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
+               // We check whether the KvStates require notifications
+               if (keyValueStates != null) {
+                       for (KvState<?, ?, ?, ?, ?> kvstate : keyValueStates) {
+                               if (kvstate instanceof CheckpointListener) {
+                                       ((CheckpointListener) 
kvstate).notifyCheckpointComplete(checkpointId);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Injects K/V state snapshots for lazy restore.
+        * @param keyValueStateSnapshots The Map of snapshots
+        */
+       @SuppressWarnings("unchecked,rawtypes")
+       public final void injectKeyValueStateSnapshots(HashMap<String, 
KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws 
Exception {
+               if (keyValueStateSnapshots != null) {
+                       if (keyValueStatesByName == null) {
+                               keyValueStatesByName = new HashMap<>();
+                       }
+
+                       for (Map.Entry<String, KvStateSnapshot> state : 
keyValueStateSnapshots.entrySet()) {
+                               KvState kvState = 
state.getValue().restoreState(this,
+                                       keySerializer,
+                                       userCodeClassLoader,
+                                       recoveryTimestamp);
+                               keyValueStatesByName.put(state.getKey(), 
kvState);
+                       }
+                       keyValueStates = 
keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  storing state for a checkpoint
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates an output stream that writes into the state of the given 
checkpoint. When the stream
+        * is closes, it returns a state handle that can retrieve the state 
back.
+        *
+        * @param checkpointID The ID of the checkpoint.
+        * @param timestamp The timestamp of the checkpoint.
+        * @return An output stream that writes state for the given checkpoint.
+        *
+        * @throws Exception Exceptions may occur while creating the stream and 
should be forwarded.
+        */
+       public abstract CheckpointStateOutputStream 
createCheckpointStateOutputStream(
+                       long checkpointID, long timestamp) throws Exception;
+
+       /**
+        * Creates a {@link DataOutputView} stream that writes into the state 
of the given checkpoint.
+        * When the stream is closes, it returns a state handle that can 
retrieve the state back.
+        *
+        * @param checkpointID The ID of the checkpoint.
+        * @param timestamp The timestamp of the checkpoint.
+        * @return An DataOutputView stream that writes state for the given 
checkpoint.
+        *
+        * @throws Exception Exceptions may occur while creating the stream and 
should be forwarded.
+        */
+       public CheckpointStateOutputView createCheckpointStateOutputView(
+                       long checkpointID, long timestamp) throws Exception {
+               return new 
CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, 
timestamp));
+       }
+
+       /**
+        * Writes the given state into the checkpoint, and returns a handle 
that can retrieve the state back.
+        *
+        * @param state The state to be checkpointed.
+        * @param checkpointID The ID of the checkpoint.
+        * @param timestamp The timestamp of the checkpoint.
+        * @param <S> The type of the state.
+        *
+        * @return A state handle that can retrieve the checkpoined state.
+        *
+        * @throws Exception Exceptions may occur during serialization / 
storing the state and should be forwarded.
+        */
+       public abstract <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(
+                       S state, long checkpointID, long timestamp) throws 
Exception;
+
+
+       // 
------------------------------------------------------------------------
+       //  Checkpoint state output stream
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A dedicated output stream that produces a {@link StreamStateHandle} 
when closed.
+        */
+       public static abstract class CheckpointStateOutputStream extends 
OutputStream {
+
+               /**
+                * Closes the stream and gets a state handle that can create an 
input stream
+                * producing the data written to this stream.
+                *
+                * @return A state handle that can create an input stream 
producing the data written to this stream.
+                * @throws IOException Thrown, if the stream cannot be closed.
+                */
+               public abstract StreamStateHandle closeAndGetHandle() throws 
IOException;
+       }
+
+       /**
+        * A dedicated DataOutputView stream that produces a {@code 
StateHandle<DataInputView>} when closed.
+        */
+       public static final class CheckpointStateOutputView extends 
DataOutputViewStreamWrapper {
+
+               private final CheckpointStateOutputStream out;
+
+               public CheckpointStateOutputView(CheckpointStateOutputStream 
out) {
+                       super(out);
+                       this.out = out;
+               }
+
+               /**
+                * Closes the stream and gets a state handle that can create a 
DataInputView.
+                * producing the data written to this stream.
+                *
+                * @return A state handle that can create an input stream 
producing the data written to this stream.
+                * @throws IOException Thrown, if the stream cannot be closed.
+                */
+               public StateHandle<DataInputView> closeAndGetHandle() throws 
IOException {
+                       return new DataInputViewHandle(out.closeAndGetHandle());
+               }
+
+               @Override
+               public void close() throws IOException {
+                       out.close();
+               }
+       }
+
+       /**
+        * Simple state handle that resolved a {@link DataInputView} from a 
StreamStateHandle.
+        */
+       private static final class DataInputViewHandle implements 
StateHandle<DataInputView> {
+
+               private static final long serialVersionUID = 
2891559813513532079L;
+
+               private final StreamStateHandle stream;
+
+               private DataInputViewHandle(StreamStateHandle stream) {
+                       this.stream = stream;
+               }
+
+               @Override
+               public DataInputView getState(ClassLoader userCodeClassLoader) 
throws Exception {
+                       return new 
DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       stream.discardState();
+               }
+
+               @Override
+               public long getStateSize() throws Exception {
+                       return stream.getStateSize();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
new file mode 100644
index 0000000..3bad8b0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+@SuppressWarnings("ForLoopReplaceableByForEach")
+final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> 
{
+
+       private static final long serialVersionUID = 1119562170939152304L;
+
+       private final TypeSerializer<T> elementSerializer;
+
+       public ArrayListSerializer(TypeSerializer<T> elementSerializer) {
+               this.elementSerializer = elementSerializer;
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<ArrayList<T>> duplicate() {
+               TypeSerializer<T> duplicateElement = 
elementSerializer.duplicate();
+               return duplicateElement == elementSerializer ? this : new 
ArrayListSerializer<T>(duplicateElement);
+       }
+
+       @Override
+       public ArrayList<T> createInstance() {
+               return new ArrayList<>();
+       }
+
+       @Override
+       public ArrayList<T> copy(ArrayList<T> from) {
+               ArrayList<T> newList = new ArrayList<>(from.size());
+               for (int i = 0; i < from.size(); i++) {
+                       newList.add(elementSerializer.copy(from.get(i)));
+               }
+               return newList;
+       }
+
+       @Override
+       public ArrayList<T> copy(ArrayList<T> from, ArrayList<T> reuse) {
+               return copy(from);
+       }
+
+       @Override
+       public int getLength() {
+               return -1; // var length
+       }
+
+       @Override
+       public void serialize(ArrayList<T> list, DataOutputView target) throws 
IOException {
+               final int size = list.size();
+               target.writeInt(size);
+               for (int i = 0; i < size; i++) {
+                       elementSerializer.serialize(list.get(i), target);
+               }
+       }
+
+       @Override
+       public ArrayList<T> deserialize(DataInputView source) throws 
IOException {
+               final int size = source.readInt();
+               final ArrayList<T> list = new ArrayList<>(size);
+               for (int i = 0; i < size; i++) {
+                       list.add(elementSerializer.deserialize(source));
+               }
+               return list;
+       }
+
+       @Override
+       public ArrayList<T> deserialize(ArrayList<T> reuse, DataInputView 
source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               // copy number of elements
+               final int num = source.readInt();
+               target.writeInt(num);
+               for (int i = 0; i < num; i++) {
+                       elementSerializer.copy(source, target);
+               }
+       }
+
+       // --------------------------------------------------------------------
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj == this ||
+                       (obj != null && obj.getClass() == getClass() &&
+                               
elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return true;
+       }
+
+       @Override
+       public int hashCode() {
+               return elementSerializer.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
new file mode 100644
index 0000000..1f18805
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+/**
+ * This interface must be implemented by functions/operations that want to 
receive
+ * a commit notification once a checkpoint has been completely acknowledged by 
all
+ * participants.
+ */
+public interface CheckpointListener {
+
+       /**
+        * This method is called as a notification once a distributed 
checkpoint has been completed.
+        * 
+        * Note that any exception during this method will not cause the 
checkpoint to
+        * fail any more.
+        * 
+        * @param checkpointId The ID of the checkpoint that has been completed.
+        * @throws Exception
+        */
+       void notifyCheckpointComplete(long checkpointId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/caf46728/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
new file mode 100644
index 0000000..c20962f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Generic implementation of {@link ListState} based on a wrapped {@link 
ValueState}.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values stored in this {@code ListState}.
+ * @param <Backend> The type of {@link AbstractStateBackend} that manages this 
{@code KvState}.
+ * @param <W> Generic type that extends both the underlying {@code ValueState} 
and {@code KvState}.
+ */
+public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W 
extends ValueState<ArrayList<T>> & KvState<K, N, ValueState<ArrayList<T>>, 
ValueStateDescriptor<ArrayList<T>>, Backend>>
+       implements ListState<T>, KvState<K, N, ListState<T>, 
ListStateDescriptor<T>, Backend> {
+
+       private final W wrappedState;
+
+       @SuppressWarnings("unchecked")
+       public GenericListState(ValueState<ArrayList<T>> wrappedState) {
+               if (!(wrappedState instanceof KvState)) {
+                       throw new IllegalArgumentException("Wrapped state must 
be a KvState.");
+               }
+               this.wrappedState = (W) wrappedState;
+       }
+
+       @Override
+       public void setCurrentKey(K key) {
+               wrappedState.setCurrentKey(key);
+       }
+
+       @Override
+       public void setCurrentNamespace(N namespace) {
+               wrappedState.setCurrentNamespace(namespace);
+       }
+
+       @Override
+       public KvStateSnapshot<K, N, ListState<T>, ListStateDescriptor<T>, 
Backend> snapshot(
+               long checkpointId,
+               long timestamp) throws Exception {
+               KvStateSnapshot<K, N, ValueState<ArrayList<T>>, 
ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot = 
wrappedState.snapshot(
+                       checkpointId,
+                       timestamp);
+               return new Snapshot<>(wrappedSnapshot);
+       }
+
+       @Override
+       public void dispose() {
+               wrappedState.dispose();
+       }
+
+       @Override
+       public Iterable<T> get() throws Exception {
+               ArrayList<T> result = wrappedState.value();
+               if (result == null) {
+                       return Collections.emptyList();
+               }
+               return result;
+       }
+
+       @Override
+       public void add(T value) throws Exception {
+               ArrayList<T> currentValue = wrappedState.value();
+               if (currentValue == null) {
+                       currentValue = new ArrayList<>();
+                       currentValue.add(value);
+                       wrappedState.update(currentValue);
+               } else {
+                       currentValue.add(value);
+                       wrappedState.update(currentValue);
+               }
+       }
+
+       @Override
+       public void clear() {
+               wrappedState.clear();
+       }
+
+       private static class Snapshot<K, N, T, Backend extends 
AbstractStateBackend> implements KvStateSnapshot<K, N, ListState<T>, 
ListStateDescriptor<T>, Backend> {
+               private static final long serialVersionUID = 1L;
+
+               private final KvStateSnapshot<K, N, ValueState<ArrayList<T>>, 
ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot;
+
+               public Snapshot(KvStateSnapshot<K, N, ValueState<ArrayList<T>>, 
ValueStateDescriptor<ArrayList<T>>, Backend> wrappedSnapshot) {
+                       this.wrappedSnapshot = wrappedSnapshot;
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public KvState<K, N, ListState<T>, ListStateDescriptor<T>, 
Backend> restoreState(
+                       Backend stateBackend,
+                       TypeSerializer<K> keySerializer,
+                       ClassLoader classLoader,
+                       long recoveryTimestamp) throws Exception {
+                       return new GenericListState((ValueState<T>) 
wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, 
recoveryTimestamp));
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       wrappedSnapshot.discardState();
+               }
+
+               @Override
+               public long getStateSize() throws Exception {
+                       return wrappedSnapshot.getStateSize();
+               }
+       }
+}

Reply via email to