http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 9552325..f0eb53e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
@@ -39,8 +40,8 @@ import java.util.Map;
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-               extends AbstractHeapState<K, N, ArrayList<V>, ListState<V>, 
ListStateDescriptor<V>>
-               implements ListState<V> {
+               extends AbstractHeapMergingState<K, N, V, Iterable<V>, 
ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+               implements InternalListState<N, V> {
 
        /**
         * Creates a new key/value state for the given hash map of key/value 
pairs.
@@ -59,6 +60,10 @@ public class HeapListState<K, N, V>
                super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
        }
 
+       // 
------------------------------------------------------------------------
+       //  state access
+       // 
------------------------------------------------------------------------
+
        @Override
        public Iterable<V> get() {
                Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
@@ -154,4 +159,14 @@ public class HeapListState<K, N, V>
 
                return baos.toByteArray();
        }
+
+       // 
------------------------------------------------------------------------
+       //  state merging
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
+               a.addAll(b);
+               return a;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 37aa812..7804cb4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -37,8 +38,8 @@ import java.util.Map;
  * @param <V> The type of the value.
  */
 public class HeapReducingState<K, N, V>
-               extends AbstractHeapState<K, N, V, ReducingState<V>, 
ReducingStateDescriptor<V>>
-               implements ReducingState<V> {
+               extends AbstractHeapMergingState<K, N, V, V, V, 
ReducingState<V>, ReducingStateDescriptor<V>>
+               implements InternalReducingState<N, V> {
 
        private final ReduceFunction<V> reduceFunction;
 
@@ -56,10 +57,15 @@ public class HeapReducingState<K, N, V>
                        StateTable<K, N, V> stateTable,
                        TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer) {
+
                super(backend, stateDesc, stateTable, keySerializer, 
namespaceSerializer);
                this.reduceFunction = stateDesc.getReduceFunction();
        }
 
+       // 
------------------------------------------------------------------------
+       //  state access
+       // 
------------------------------------------------------------------------
+
        @Override
        public V get() {
                Preconditions.checkState(currentNamespace != null, "No 
namespace set.");
@@ -111,13 +117,22 @@ public class HeapReducingState<K, N, V>
                if (currentValue == null) {
                        // we're good, just added the new value
                } else {
-                       V reducedValue = null;
+                       V reducedValue;
                        try {
                                reducedValue = 
reduceFunction.reduce(currentValue, value);
                        } catch (Exception e) {
-                               throw new RuntimeException("Could not add value 
to reducing state.", e);
+                               throw new IOException("Exception while applying 
ReduceFunction in reducing state", e);
                        }
                        keyedMap.put(backend.<K>getCurrentKey(), reducedValue);
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  state merging
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected V mergeState(V a, V b) throws Exception {
+               return reduceFunction.reduce(a, b);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index cccaacb..9e042fe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
@@ -36,7 +37,7 @@ import java.util.Map;
  */
 public class HeapValueState<K, N, V>
                extends AbstractHeapState<K, N, V, ValueState<V>, 
ValueStateDescriptor<V>>
-               implements ValueState<V> {
+               implements InternalValueState<N, V> {
 
        /**
         * Creates a new key/value state for the given hash map of key/value 
pairs.

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 797150a..9d7232e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -78,4 +79,21 @@ public class StateTable<K, N, ST> {
        public List<Map<N, Map<K, ST>>> getState() {
                return state;
        }
+
+       // 
------------------------------------------------------------------------
+       //  for testing
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       boolean isEmpty() {
+               for (Map<N, Map<K, ST>> map : state) {
+                       if (map != null) {
+                               if (!map.isEmpty()) {
+                                       return false;
+                               }
+                       }
+               }
+
+               return true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
new file mode 100644
index 0000000..ae9f457
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.AppendingState;
+
+/**
+ * The peer to the {@link AppendingState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state 
hierarchy.
+ * 
+ * @param <N>   The type of the namespace
+ * @param <IN>  The type of elements added to the state
+ * @param <OUT> The type of the 
+ */
+public interface InternalAppendingState<N, IN, OUT> extends 
InternalKvState<N>, AppendingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
new file mode 100644
index 0000000..eb58ce5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.FoldingState;
+
+/**
+ * The peer to the {@link FoldingState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state 
hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ */
+public interface InternalFoldingState<N, T, ACC> extends 
InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
new file mode 100644
index 0000000..06f64b6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.State;
+
+/**
+ * The {@code InternalKvState} is the root of the internal state type 
hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ * 
+ * <p>The internal state classes give access to the namespace getters and 
setters and access to
+ * additional functionality, like raw value access or state merging.
+ * 
+ * <p>The public API state hierarchy is intended to be programmed against by 
Flink applications.
+ * The internal state hierarchy holds all the auxiliary methods that are used 
by the runtime and not
+ * intended to be used by user applications. These internal methods are 
considered of limited use to users and
+ * only confusing, and are usually not regarded as stable across releases.
+ * 
+ * <p>Each specific type in the internal state hierarchy extends the type from 
the public
+ * state hierarchy:
+ * 
+ * <pre>
+ *             State
+ *               |
+ *               +-------------------InternalKvState
+ *               |                         |
+ *          MergingState                   |
+ *               |                         |
+ *               +-----------------InternalMergingState
+ *               |                         |
+ *      +--------+------+                  |
+ *      |               |                  |
+ * ReducingState    ListState        +-----+-----------------+
+ *      |               |            |                       |
+ *      +-----------+   +-----------   -----------------InternalListState
+ *                  |                |
+ *                  +---------InternalReducingState
+ * </pre>
+ * 
+ * @param <N> The type of the namespace.
+ */
+public interface InternalKvState<N> extends State {
+
+       /**
+        * Sets the current namespace, which will be used when using the state 
access methods.
+        *
+        * @param namespace The namespace.
+        */
+       void setCurrentNamespace(N namespace);
+
+       /**
+        * Returns the serialized value for the given key and namespace.
+        *
+        * <p>If no value is associated with key and namespace, 
<code>null</code>
+        * is returned.
+        *
+        * @param serializedKeyAndNamespace Serialized key and namespace
+        * @return Serialized value or <code>null</code> if no value is 
associated with the key and namespace.
+        * 
+        * @throws Exception Exceptions during serialization are forwarded
+        */
+       byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
new file mode 100644
index 0000000..ae392ed
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ListState;
+
+/**
+ * The peer to the {@link ListState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state 
hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalListState<N, T> extends InternalMergingState<N, T, 
Iterable<T>>, ListState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
new file mode 100644
index 0000000..abc7d7c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.MergingState;
+
+import java.util.Collection;
+
+/**
+ * The peer to the {@link MergingState} in the internal state type hierarchy.
+ * 
+ * See {@link InternalKvState} for a description of the internal state 
hierarchy.
+ * 
+ * @param <N>   The type of the namespace
+ * @param <IN>  The type of elements added to the state
+ * @param <OUT> The type of elements 
+ */
+public interface InternalMergingState<N, IN, OUT> extends 
InternalAppendingState<N, IN, OUT>, MergingState<IN, OUT> {
+
+       /**
+        * Merges the state of the current key for the given source namespaces 
into the state of
+        * the target namespace.
+        * 
+        * @param target The target namespace where the merged state should be 
stored.
+        * @param sources The source namespaces whose state should be merged.
+        * 
+        * @throws Exception The method may forward exception thrown internally 
(by I/O or functions).
+        */
+       void mergeNamespaces(N target, Collection<N> sources) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
new file mode 100644
index 0000000..40e625c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ReducingState;
+
+/**
+ * The peer to the {@link ReducingState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state 
hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalReducingState<N, T> extends InternalMergingState<N, 
T, T>, ReducingState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
new file mode 100644
index 0000000..7177b8a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ValueState;
+
+/**
+ * The peer to the {@link ValueState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state 
hierarchy.
+ * 
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalValueState<N, T> extends InternalKvState<N>, 
ValueState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 86f8766..1c02ca1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -43,8 +43,8 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -593,7 +593,7 @@ public class KvStateClientTest {
                                state.update(201 + i);
 
                                // we know it must be a KvStat but this is not 
exposed to the user via State
-                               KvState<?> kvState = (KvState<?>) state;
+                               InternalKvState<?> kvState = 
(InternalKvState<?>) state;
 
                                // Register KvState (one state instance for all 
server)
                                ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index b1ec86f..202024c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -40,9 +40,9 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -267,7 +267,7 @@ public class KvStateServerHandlerTest extends TestLogger {
        }
 
        /**
-        * Tests the failure response on a failure on the {@link 
KvState#getSerializedValue(byte[])}
+        * Tests the failure response on a failure on the {@link 
InternalKvState#getSerializedValue(byte[])}
         * call.
         */
        @Test
@@ -279,7 +279,7 @@ public class KvStateServerHandlerTest extends TestLogger {
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                // Failing KvState
-               KvState<?> kvState = mock(KvState.class);
+               InternalKvState<?> kvState = mock(InternalKvState.class);
                when(kvState.getSerializedValue(any(byte[].class)))
                                .thenThrow(new RuntimeException("Expected test 
Exception"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 0d9c2e4..69dbe6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.query.netty.message;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
-import org.apache.flink.api.common.state.ListState;
+
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -30,10 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -315,7 +317,7 @@ public class KvStateRequestSerializerTest {
         */
        @Test
        public void testListSerialization() throws Exception {
-               final long key = 0l;
+               final long key = 0L;
 
                // objects for heap state list serialisation
                final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
@@ -327,9 +329,10 @@ public class KvStateRequestSerializerTest {
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 
-               final ListState<Long> listState = longHeapKeyedStateBackend
-                       .createListState(VoidNamespaceSerializer.INSTANCE,
+               final InternalListState<VoidNamespace, Long> listState = 
longHeapKeyedStateBackend.createListState(
+                               VoidNamespaceSerializer.INSTANCE,
                                new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
+
                testListSerialization(key, listState);
        }
 
@@ -340,19 +343,16 @@ public class KvStateRequestSerializerTest {
         * @param key
         *              key of the list state
         * @param listState
-        *              list state using the {@link VoidNamespace}, must also 
be a {@link
-        *              KvState} instance
+        *              list state using the {@link VoidNamespace}, must also 
be a {@link InternalKvState} instance
         *
         * @throws Exception
         */
-       public static void testListSerialization(final long key,
-               final ListState<Long> listState) throws Exception {
+       public static void testListSerialization(
+                       final long key,
+                       final InternalListState<VoidNamespace, Long> listState) 
throws Exception {
 
                TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
-
-               final KvState<VoidNamespace> listKvState =
-                       (KvState<VoidNamespace>) listState;
-               listKvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+               listState.setCurrentNamespace(VoidNamespace.INSTANCE);
 
                // List
                final int numElements = 10;
@@ -368,8 +368,8 @@ public class KvStateRequestSerializerTest {
                        KvStateRequestSerializer.serializeKeyAndNamespace(
                                key, LongSerializer.INSTANCE,
                                VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE);
-               final byte[] serializedValues =
-                       listKvState.getSerializedValue(serializedKey);
+               
+               final byte[] serializedValues = 
listState.getSerializedValue(serializedKey);
 
                List<Long> actualValues = 
KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
                assertEquals(expectedValues, actualValues);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 38e04aa..c560ab0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.query.KvStateRegistryListener;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.heap.AbstractHeapState;
 import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -167,7 +168,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                @SuppressWarnings("unchecked")
-               KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+               InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
                // some modifications to the state
                backend.setCurrentKey(1);
@@ -214,7 +215,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                ValueState<String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                @SuppressWarnings("unchecked")
-               KvState<VoidNamespace> restoredKvState1 = 
(KvState<VoidNamespace>) restored1;
+               InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
                backend.setCurrentKey(1);
                assertEquals("1", restored1.value());
@@ -230,7 +231,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                ValueState<String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                @SuppressWarnings("unchecked")
-               KvState<VoidNamespace> restoredKvState2 = 
(KvState<VoidNamespace>) restored2;
+               InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
                backend.setCurrentKey(1);
                assertEquals("u1", restored2.value());
@@ -246,7 +247,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        }
 
        /**
-        * Tests {@link ValueState#value()} and {@link 
KvState#getSerializedValue(byte[])}
+        * Tests {@link ValueState#value()} and {@link 
InternalKvState#getSerializedValue(byte[])}
         * accessing the state concurrently. They should not get in the way of 
each
         * other.
         */
@@ -255,7 +256,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        public void testValueStateRace() throws Exception {
                final AbstractKeyedStateBackend<Integer> backend =
                        createKeyedBackend(IntSerializer.INSTANCE);
-               final Integer namespace = Integer.valueOf(1);
+               final Integer namespace = 1;
 
                final ValueStateDescriptor<String> kvId =
                        new ValueStateDescriptor<>("id", String.class);
@@ -270,7 +271,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        .getPartitionedState(namespace, IntSerializer.INSTANCE, 
kvId);
 
                @SuppressWarnings("unchecked")
-               final KvState<Integer> kvState = (KvState<Integer>) state;
+               final InternalKvState<Integer> kvState = 
(InternalKvState<Integer>) state;
 
                /**
                 * 1) Test that ValueState#value() before and after
@@ -496,7 +497,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        ListState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> kvState = 
(KvState<VoidNamespace>) state;
+                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
                        Joiner joiner = Joiner.on(",");
                        // some modifications to the state
@@ -544,7 +545,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        ListState<String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> restoredKvState1 = 
(KvState<VoidNamespace>) restored1;
+                       InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
                        backend.setCurrentKey(1);
                        assertEquals("1", joiner.join(restored1.get()));
@@ -560,7 +561,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        ListState<String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> restoredKvState2 = 
(KvState<VoidNamespace>) restored2;
+                       InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
                        backend.setCurrentKey(1);
                        assertEquals("1,u1", joiner.join(restored2.get()));
@@ -596,7 +597,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        ReducingState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> kvState = 
(KvState<VoidNamespace>) state;
+                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
                        // some modifications to the state
                        backend.setCurrentKey(1);
@@ -643,7 +644,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        ReducingState<String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> restoredKvState1 = 
(KvState<VoidNamespace>) restored1;
+                       InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
                        backend.setCurrentKey(1);
                        assertEquals("1", restored1.get());
@@ -659,7 +660,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        ReducingState<String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> restoredKvState2 = 
(KvState<VoidNamespace>) restored2;
+                       InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
                        backend.setCurrentKey(1);
                        assertEquals("1,u1", restored2.get());
@@ -698,7 +699,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        FoldingState<Integer, String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> kvState = 
(KvState<VoidNamespace>) state;
+                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
 
                        // some modifications to the state
                        backend.setCurrentKey(1);
@@ -746,7 +747,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        FoldingState<Integer, String> restored1 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> restoredKvState1 = 
(KvState<VoidNamespace>) restored1;
+                       InternalKvState<VoidNamespace> restoredKvState1 = 
(InternalKvState<VoidNamespace>) restored1;
 
                        backend.setCurrentKey(1);
                        assertEquals("Fold-Initial:,1", restored1.get());
@@ -763,7 +764,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        @SuppressWarnings("unchecked")
                        FoldingState<Integer, String> restored2 = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
                        @SuppressWarnings("unchecked")
-                       KvState<VoidNamespace> restoredKvState2 = 
(KvState<VoidNamespace>) restored2;
+                       InternalKvState<VoidNamespace> restoredKvState2 = 
(InternalKvState<VoidNamespace>) restored2;
 
                        backend.setCurrentKey(1);
                        assertEquals("Fold-Initial:,101", restored2.get());
@@ -1254,7 +1255,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                        VoidNamespaceSerializer.INSTANCE,
                                        desc);
 
-                       KvState<VoidNamespace> kvState = 
(KvState<VoidNamespace>) state;
+                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
                        assertTrue(kvState instanceof AbstractHeapState);
 
                        kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1280,7 +1281,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                        VoidNamespaceSerializer.INSTANCE,
                                        desc);
 
-                       KvState<VoidNamespace> kvState = 
(KvState<VoidNamespace>) state;
+                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
                        assertTrue(kvState instanceof AbstractHeapState);
 
                        kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1311,7 +1312,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                        VoidNamespaceSerializer.INSTANCE,
                                        desc);
 
-                       KvState<VoidNamespace> kvState = 
(KvState<VoidNamespace>) state;
+                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
                        assertTrue(kvState instanceof AbstractHeapState);
 
                        kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1342,7 +1343,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                        VoidNamespaceSerializer.INSTANCE,
                                        desc);
 
-                       KvState<VoidNamespace> kvState = 
(KvState<VoidNamespace>) state;
+                       InternalKvState<VoidNamespace> kvState = 
(InternalKvState<VoidNamespace>) state;
                        assertTrue(kvState instanceof AbstractHeapState);
 
                        kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1451,7 +1452,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
         * if it is not null.
         */
        private static <V, K, N> V getSerializedValue(
-                       KvState<N> kvState,
+                       InternalKvState<N> kvState,
                        K key,
                        TypeSerializer<K> keySerializer,
                        N namespace,
@@ -1475,7 +1476,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
         * if it is not null.
         */
        private static <V, K, N> List<V> getSerializedList(
-                       KvState<N> kvState,
+                       InternalKvState<N> kvState,
                        K key,
                        TypeSerializer<K> keySerializer,
                        N namespace,

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
new file mode 100644
index 0000000..33d60a0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link 
ListState}.
+ */
+public class HeapListStateTest {
+
+       @Test
+       public void testAddAndGet() throws Exception {
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final HeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       InternalListState<VoidNamespace, Long> state =
+                                       
keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(state.get());
+                       state.add(17L);
+                       state.add(11L);
+                       assertEquals(asList(17L, 11L), state.get());
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertNull(state.get());
+                       state.add(1L);
+                       state.add(2L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertEquals(asList(17L, 11L), state.get());
+                       state.clear();
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       state.add(3L);
+                       state.add(2L);
+                       state.add(1L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
+                       state.clear();
+
+                       // make sure all lists / maps are cleared
+
+                       StateTable<String, VoidNamespace, ArrayList<Long>> 
stateTable =
+                                       ((HeapListState<String, VoidNamespace, 
Long>) state).stateTable;
+
+                       assertTrue(stateTable.isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       @Test
+       public void testMerging() throws Exception {
+
+               final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final Integer namespace1 = 1;
+               final Integer namespace2 = 2;
+               final Integer namespace3 = 3;
+
+               final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 
33L, 44L, 55L));
+
+               final HeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       InternalListState<Integer, Long> state = 
keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+
+                       // populate the different namespaces
+                       //  - abc spreads the values over three namespaces
+                       //  - def spreads teh values over two namespaces (one 
empty)
+                       //  - ghi is empty
+                       //  - jkl has all elements already in the target 
namespace
+                       //  - mno has all elements already in one source 
namespace
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.setCurrentNamespace(namespace1);
+                       state.add(33L);
+                       state.add(55L);
+
+                       state.setCurrentNamespace(namespace2);
+                       state.add(22L);
+                       state.add(11L);
+
+                       state.setCurrentNamespace(namespace3);
+                       state.add(44L);
+
+                       keyedBackend.setCurrentKey("def");
+                       state.setCurrentNamespace(namespace1);
+                       state.add(11L);
+                       state.add(44L);
+
+                       state.setCurrentNamespace(namespace3);
+                       state.add(22L);
+                       state.add(55L);
+                       state.add(33L);
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.setCurrentNamespace(namespace1);
+                       state.add(11L);
+                       state.add(22L);
+                       state.add(33L);
+                       state.add(44L);
+                       state.add(55L);
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.setCurrentNamespace(namespace3);
+                       state.add(11L);
+                       state.add(22L);
+                       state.add(33L);
+                       state.add(44L);
+                       state.add(55L);
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       keyedBackend.setCurrentKey("def");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       keyedBackend.setCurrentKey("ghi");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       validateResult(state.get(), expectedResult);
+
+                       // make sure all lists / maps are cleared
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("def");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("ghi");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       StateTable<String, Integer, ArrayList<Long>> stateTable 
= 
+                                       ((HeapListState<String, Integer, Long>) 
state).stateTable;
+
+                       assertTrue(stateTable.isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       private static HeapKeyedStateBackend<String> createKeyedBackend() 
throws Exception {
+               return new HeapKeyedStateBackend<>(
+                               mock(TaskKvStateRegistry.class),
+                               StringSerializer.INSTANCE,
+                               HeapListStateTest.class.getClassLoader(),
+                               16,
+                               new KeyGroupRange(0, 15));
+       }
+       
+       private static <T> void validateResult(Iterable<T> values, Set<T> 
expected) {
+               int num = 0;
+               for (T v : values) {
+                       num++;
+                       assertTrue(expected.contains(v));
+               }
+
+               assertEquals(expected.size(), num);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
new file mode 100644
index 0000000..e0929f1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link 
ReducingState}.
+ */
+public class HeapReducingStateTest {
+
+       @Test
+       public void testAddAndGet() throws Exception {
+
+               final ReducingStateDescriptor<Long> stateDescr =
+                               new ReducingStateDescriptor<>("my-state", new 
AddingFunction(), Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final HeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       InternalReducingState<VoidNamespace, Long> state =
+                                       
keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(state.get());
+                       state.add(17L);
+                       state.add(11L);
+                       assertEquals(28L, state.get().longValue());
+
+                       keyedBackend.setCurrentKey("abc");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertNull(state.get());
+                       state.add(1L);
+                       state.add(2L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertEquals(28L, state.get().longValue());
+                       state.clear();
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       state.add(3L);
+                       state.add(2L);
+                       state.add(1L);
+
+                       keyedBackend.setCurrentKey("def");
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("g");
+                       assertEquals(9L, state.get().longValue());
+                       state.clear();
+
+                       // make sure all lists / maps are cleared
+
+                       StateTable<String, VoidNamespace, Long> stateTable =
+                                       ((HeapReducingState<String, 
VoidNamespace, Long>) state).stateTable;
+
+                       assertTrue(stateTable.isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       @Test
+       public void testMerging() throws Exception {
+
+               final ReducingStateDescriptor<Long> stateDescr = new 
ReducingStateDescriptor<>(
+                               "my-state", new AddingFunction(), Long.class);
+               stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+               final Integer namespace1 = 1;
+               final Integer namespace2 = 2;
+               final Integer namespace3 = 3;
+
+               final Long expectedResult = 165L;
+
+               final HeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+
+               try {
+                       final InternalReducingState<Integer, Long> state =
+                                       
keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr);
+
+                       // populate the different namespaces
+                       //  - abc spreads the values over three namespaces
+                       //  - def spreads teh values over two namespaces (one 
empty)
+                       //  - ghi is empty
+                       //  - jkl has all elements already in the target 
namespace
+                       //  - mno has all elements already in one source 
namespace
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.setCurrentNamespace(namespace1);
+                       state.add(33L);
+                       state.add(55L);
+
+                       state.setCurrentNamespace(namespace2);
+                       state.add(22L);
+                       state.add(11L);
+
+                       state.setCurrentNamespace(namespace3);
+                       state.add(44L);
+
+                       keyedBackend.setCurrentKey("def");
+                       state.setCurrentNamespace(namespace1);
+                       state.add(11L);
+                       state.add(44L);
+
+                       state.setCurrentNamespace(namespace3);
+                       state.add(22L);
+                       state.add(55L);
+                       state.add(33L);
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.setCurrentNamespace(namespace1);
+                       state.add(11L);
+                       state.add(22L);
+                       state.add(33L);
+                       state.add(44L);
+                       state.add(55L);
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.setCurrentNamespace(namespace3);
+                       state.add(11L);
+                       state.add(22L);
+                       state.add(33L);
+                       state.add(44L);
+                       state.add(55L);
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, state.get());
+
+                       keyedBackend.setCurrentKey("def");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, state.get());
+
+                       keyedBackend.setCurrentKey("ghi");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertNull(state.get());
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, state.get());
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.mergeNamespaces(namespace1, asList(namespace2, 
namespace3));
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(expectedResult, state.get());
+
+                       // make sure all lists / maps are cleared
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("def");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("ghi");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.setCurrentNamespace(namespace1);
+                       state.clear();
+                       
+                       StateTable<String, Integer, Long> stateTable =
+                                       ((HeapReducingState<String, Integer, 
Long>) state).stateTable;
+
+                       assertTrue(stateTable.isEmpty());
+               }
+               finally {
+                       keyedBackend.close();
+                       keyedBackend.dispose();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private static HeapKeyedStateBackend<String> createKeyedBackend() 
throws Exception {
+               return new HeapKeyedStateBackend<>(
+                               mock(TaskKvStateRegistry.class),
+                               StringSerializer.INSTANCE,
+                               HeapReducingStateTest.class.getClassLoader(),
+                               16,
+                               new KeyGroupRange(0, 15));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test functions
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("serial")
+       private static class AddingFunction implements ReduceFunction<Long> {
+
+               @Override
+               public Long reduce(Long a, Long b)  {
+                       return a + b;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a21660c..6bb0a40 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -494,15 +494,34 @@ public abstract class AbstractStreamOperator<OUT>
                return getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, stateDescriptor);
        }
 
+
+       protected <N, S extends State, T> S getOrCreateKeyedState(
+                       TypeSerializer<N> namespaceSerializer,
+                       StateDescriptor<S, T> stateDescriptor) throws Exception 
{
+
+               if (keyedStateStore != null) {
+                       return 
keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+               }
+               else {
+                       throw new IllegalStateException("Cannot create 
partitioned state. " +
+                                       "The keyed state backend has not been 
set." +
+                                       "This indicates that the operator is 
not partitioned/keyed.");
+               }
+       }
+
        /**
         * Creates a partitioned state handle, using the state backend 
configured for this task.
+        * 
+        * TODO: NOTE: This method does a lot of work caching / retrieving 
states just to update the namespace.
+        *       This method should be removed for the sake of namespaces being 
lazily fetched from the keyed
+        *       state backend, or being set on the state directly.
         *
         * @throws IllegalStateException Thrown, if the key/value state was 
already initialized.
         * @throws Exception Thrown, if the state backend cannot create the 
key/value state.
         */
-       @SuppressWarnings("unchecked")
        protected <S extends State, N> S getPartitionedState(
-                       N namespace, TypeSerializer<N> namespaceSerializer,
+                       N namespace,
+                       TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<S, ?> stateDescriptor) throws Exception 
{
 
                if (keyedStateStore != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 0d5d091..05c89dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -35,7 +35,7 @@ import java.util.Set;
 
 /**
  * A {@link Window} that represents a time interval from {@code start} 
(inclusive) to
- * {@code start + size} (exclusive).
+ * {@code end} (exclusive).
  */
 @PublicEvolving
 public class TimeWindow extends Window {
@@ -48,14 +48,35 @@ public class TimeWindow extends Window {
                this.end = end;
        }
 
+       /**
+        * Gets the starting timestamp of the window. This is the first 
timestamp that belongs
+        * to this window.
+        * 
+        * @return The starting timestamp of this window.
+        */
        public long getStart() {
                return start;
        }
 
+       /**
+        * Gets the end timestamp of this window. The end timestamp is 
exclusive, meaning it
+        * is the first timestamp that does not belong to this window any more.
+        * 
+        * @return The exclusive end timestamp of this window.
+        */
        public long getEnd() {
                return end;
        }
 
+       /**
+        * Gets the largest timestamp that still belongs to this window.
+        * 
+        * <p>This timestamp is identical to {@code getEnd() - 1}.
+        * 
+        * @return The largest timestamp that still belongs to this window.
+        * 
+        * @see #getEnd() 
+        */
        @Override
        public long maxTimestamp() {
                return end - 1;
@@ -104,6 +125,13 @@ public class TimeWindow extends Window {
                return new TimeWindow(Math.min(start, other.start), 
Math.max(end, other.end));
        }
 
+       // 
------------------------------------------------------------------------
+       // Serializer
+       // 
------------------------------------------------------------------------
+
+       /**
+        * The serializer used to write the TimeWindow type.
+        */
        public static class Serializer extends TypeSerializer<TimeWindow> {
                private static final long serialVersionUID = 1L;
 
@@ -152,9 +180,7 @@ public class TimeWindow extends Window {
 
                @Override
                public TimeWindow deserialize(TimeWindow reuse, DataInputView 
source) throws IOException {
-                       long start = source.readLong();
-                       long end = source.readLong();
-                       return new TimeWindow(start, end);
+                       return deserialize(source);
                }
 
                @Override
@@ -179,6 +205,10 @@ public class TimeWindow extends Window {
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
        /**
         * Merge overlapping {@link TimeWindow}s. For use by merging
         * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner 
WindowAssigners}.

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
index 0e131ff..b17989c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -31,5 +31,10 @@ import org.apache.flink.annotation.PublicEvolving;
 @PublicEvolving
 public abstract class Window {
 
+       /**
+        * Gets the largest timestamp that still belongs to this window.
+        *
+        * @return The largest timestamp that still belongs to this window.
+        */
        public abstract long maxTimestamp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 8c73878..d9c977a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -20,15 +20,16 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -41,7 +42,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
@@ -56,40 +57,52 @@ import static java.util.Objects.requireNonNull;
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 @Internal
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends 
WindowOperator<K, IN, Iterable<IN>, OUT, W> {
+public class EvictingWindowOperator<K, IN, OUT, W extends Window> 
+               extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
 
        private static final long serialVersionUID = 1L;
 
+       // 
------------------------------------------------------------------------
+       // these fields are set by the API stream graph builder to configure 
the operator 
+       
        private final Evictor<? super IN, ? super W> evictor;
 
+       private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
evictingWindowStateDescriptor;
+
+       // 
------------------------------------------------------------------------
+       // the fields below are instantiated once the operator runs in the 
runtime 
+
        private transient EvictorContext evictorContext;
 
-       private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
windowStateDescriptor;
+       private transient InternalListState<W, StreamRecord<IN>> 
evictingWindowState;
+
+       // 
------------------------------------------------------------------------
 
        public EvictingWindowOperator(WindowAssigner<? super IN, W> 
windowAssigner,
-               TypeSerializer<W> windowSerializer,
-               KeySelector<IN, K> keySelector,
-               TypeSerializer<K> keySerializer,
-               StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
windowStateDescriptor,
-               InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
-               Trigger<? super IN, ? super W> trigger,
-               Evictor<? super IN, ? super W> evictor,
-               long allowedLateness) {
+                       TypeSerializer<W> windowSerializer,
+                       KeySelector<IN, K> keySelector,
+                       TypeSerializer<K> keySerializer,
+                       StateDescriptor<? extends ListState<StreamRecord<IN>>, 
?> windowStateDescriptor,
+                       InternalWindowFunction<Iterable<IN>, OUT, K, W> 
windowFunction,
+                       Trigger<? super IN, ? super W> trigger,
+                       Evictor<? super IN, ? super W> evictor,
+                       long allowedLateness) {
 
                super(windowAssigner, windowSerializer, keySelector,
                        keySerializer, null, windowFunction, trigger, 
allowedLateness);
-               this.evictor = requireNonNull(evictor);
-               this.windowStateDescriptor = windowStateDescriptor;
+
+               this.evictor = checkNotNull(evictor);
+               this.evictingWindowStateDescriptor = 
checkNotNull(windowStateDescriptor);
        }
 
        @Override
-       @SuppressWarnings("unchecked")
        public void processElement(StreamRecord<IN> element) throws Exception {
                Collection<W> elementWindows = windowAssigner.assignWindows(
                                element.getValue(),
                                element.getTimestamp(),
                                windowAssignerContext);
 
+               @SuppressWarnings("unchecked")
                final K key = (K) getKeyedStateBackend().getCurrentKey();
 
                if (windowAssigner instanceof MergingWindowAssigner) {
@@ -119,11 +132,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                                                }
 
                                                                // merge the 
merged state windows into the newly resulting state window
-                                                               
getKeyedStateBackend().mergePartitionedStates(
-                                                                       
stateWindowResult,
-                                                                       
mergedStateWindows,
-                                                                       
windowSerializer,
-                                                                       
(StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
+                                                               
evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                                                        }
                                                });
 
@@ -137,9 +146,9 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                if (stateWindow == null) {
                                        throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
                                }
-                               ListState<StreamRecord<IN>> windowState = 
getPartitionedState(
-                                       stateWindow, windowSerializer, 
windowStateDescriptor);
-                               windowState.add(element);
+                               
+                               
evictingWindowState.setCurrentNamespace(stateWindow);
+                               evictingWindowState.add(element);
 
                                context.key = key;
                                context.window = actualWindow;
@@ -149,16 +158,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                TriggerResult triggerResult = 
context.onElement(element);
 
                                if (triggerResult.isFire()) {
-                                       Iterable<StreamRecord<IN>> contents = 
windowState.get();
+                                       Iterable<StreamRecord<IN>> contents = 
evictingWindowState.get();
                                        if (contents == null) {
                                                // if we have no state, there 
is nothing to do
                                                continue;
                                        }
-                                       fire(actualWindow, contents, 
windowState);
+                                       fire(actualWindow, contents, 
evictingWindowState);
                                }
 
                                if (triggerResult.isPurge()) {
-                                       cleanup(actualWindow, windowState, 
mergingWindows);
+                                       cleanup(actualWindow, 
evictingWindowState, mergingWindows);
                                } else {
                                        registerCleanupTimer(actualWindow);
                                }
@@ -173,9 +182,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                        continue;
                                }
 
-                               ListState<StreamRecord<IN>> windowState = 
getPartitionedState(
-                                       window, windowSerializer, 
windowStateDescriptor);
-                               windowState.add(element);
+                               evictingWindowState.setCurrentNamespace(window);
+                               evictingWindowState.add(element);
 
                                context.key = key;
                                context.window = window;
@@ -185,16 +193,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                TriggerResult triggerResult = 
context.onElement(element);
 
                                if (triggerResult.isFire()) {
-                                       Iterable<StreamRecord<IN>> contents = 
windowState.get();
+                                       Iterable<StreamRecord<IN>> contents = 
evictingWindowState.get();
                                        if (contents == null) {
                                                // if we have no state, there 
is nothing to do
                                                continue;
                                        }
-                                       fire(window, contents, windowState);
+                                       fire(window, contents, 
evictingWindowState);
                                }
 
                                if (triggerResult.isPurge()) {
-                                       cleanup(window, windowState, null);
+                                       cleanup(window, evictingWindowState, 
null);
                                } else {
                                        registerCleanupTimer(window);
                                }
@@ -222,15 +230,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                // so it is safe to just ignore
                                return;
                        }
-                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+                       
+                       evictingWindowState.setCurrentNamespace(stateWindow);
                } else {
-                       windowState = getPartitionedState(
-                                       context.window,
-                                       windowSerializer,
-                                       windowStateDescriptor);
+                       evictingWindowState.setCurrentNamespace(context.window);
                }
 
-               Iterable<StreamRecord<IN>> contents = windowState.get();
+               Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
                if (contents == null) {
                        // if we have no state, there is nothing to do
                        return;
@@ -238,11 +244,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
                if (triggerResult.isFire()) {
-                       fire(context.window, contents, windowState);
+                       fire(context.window, contents, evictingWindowState);
                }
 
                if (triggerResult.isPurge() || (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp()))) {
-                       cleanup(context.window, windowState, mergingWindows);
+                       cleanup(context.window, evictingWindowState, 
mergingWindows);
                }
        }
 
@@ -265,12 +271,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                // so it is safe to just ignore
                                return;
                        }
-                       windowState = getPartitionedState(stateWindow, 
windowSerializer, windowStateDescriptor);
+                       evictingWindowState.setCurrentNamespace(stateWindow);
                } else {
-                       windowState = getPartitionedState(context.window, 
windowSerializer, windowStateDescriptor);
+                       evictingWindowState.setCurrentNamespace(context.window);
                }
 
-               Iterable<StreamRecord<IN>> contents = windowState.get();
+               Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
                if (contents == null) {
                        // if we have no state, there is nothing to do
                        return;
@@ -278,11 +284,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
 
                TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
                if (triggerResult.isFire()) {
-                       fire(context.window, contents, windowState);
+                       fire(context.window, contents, evictingWindowState);
                }
 
                if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp()))) {
-                       cleanup(context.window, windowState, mergingWindows);
+                       cleanup(context.window, evictingWindowState, 
mergingWindows);
                }
        }
 
@@ -381,7 +387,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
        @Override
        public void open() throws Exception {
                super.open();
+
                evictorContext = new EvictorContext(null,null);
+               evictingWindowState = (InternalListState<W, StreamRecord<IN>>) 
+                               getOrCreateKeyedState(windowSerializer, 
evictingWindowStateDescriptor);
        }
 
        @Override
@@ -409,6 +418,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
        @VisibleForTesting
        @SuppressWarnings("unchecked, rawtypes")
        public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> 
getStateDescriptor() {
-               return (StateDescriptor<? extends AppendingState<IN, 
Iterable<IN>>, ?>) windowStateDescriptor;
+               return (StateDescriptor<? extends AppendingState<IN, 
Iterable<IN>>, ?>) evictingWindowStateDescriptor;
        }
 }

Reply via email to