[FLINK-4856] Add MapState for keyed state

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c9e2b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c9e2b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c9e2b6

Branch: refs/heads/master
Commit: 30c9e2b683bf7f4776ffc23b6a860946a4429ae5
Parents: de2605e
Author: xiaogang.sxg <[email protected]>
Authored: Fri Feb 17 11:19:18 2017 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Feb 23 16:56:29 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state.md                        |   8 +-
 .../streaming/state/AbstractRocksDBState.java   |  49 +-
 .../state/RocksDBKeyedStateBackend.java         |  10 +
 .../streaming/state/RocksDBMapState.java        | 546 +++++++++++++++++++
 .../api/common/functions/RuntimeContext.java    |  42 ++
 .../util/AbstractRuntimeUDFContext.java         |   9 +
 .../flink/api/common/state/KeyedStateStore.java |  40 ++
 .../apache/flink/api/common/state/MapState.java | 134 +++++
 .../api/common/state/MapStateDescriptor.java    | 147 +++++
 .../flink/api/common/state/StateBinder.java     |   9 +
 .../flink/api/common/state/StateDescriptor.java |   2 +-
 .../common/typeutils/base/MapSerializer.java    | 193 +++++++
 .../flink/api/java/typeutils/MapTypeInfo.java   | 147 +++++
 .../common/state/MapStateDescriptorTest.java    | 115 ++++
 .../typeutils/base/MapSerializerTest.java       |  90 +++
 .../flink/hdfstests/FileStateBackendTest.java   |   4 +
 .../netty/message/KvStateRequestSerializer.java |  67 +++
 .../state/AbstractKeyedStateBackend.java        |  23 +-
 .../runtime/state/DefaultKeyedStateStore.java   |  14 +
 .../flink/runtime/state/HashMapSerializer.java  | 193 +++++++
 .../flink/runtime/state/UserFacingMapState.java | 103 ++++
 .../state/heap/HeapKeyedStateBackend.java       |  18 +-
 .../flink/runtime/state/heap/HeapMapState.java  | 311 +++++++++++
 .../state/internal/InternalMapState.java        |  32 ++
 .../message/KvStateRequestSerializerTest.java   | 131 +++++
 .../runtime/state/FileStateBackendTest.java     |   4 +
 .../runtime/state/MemoryStateBackendTest.java   |   4 +
 .../runtime/state/SerializationProxiesTest.java |   3 +-
 .../runtime/state/StateBackendTestBase.java     | 299 ++++++++++
 .../api/functions/async/RichAsyncFunction.java  |   7 +
 .../api/operators/StreamingRuntimeContext.java  |   9 +
 .../functions/async/RichAsyncFunctionTest.java  |   8 +-
 .../operators/StreamingRuntimeContextTest.java  |  85 ++-
 .../KVStateRequestSerializerRocksDBTest.java    |  40 ++
 34 files changed, 2887 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index e554e29..40522e1 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -118,6 +118,11 @@ added to the state. Contrary to `ReducingState`, the 
aggregate type may be diffe
 of elements that are added to the state. The interface is the same as for 
`ListState` but elements
 added using `add(T)` are folded into an aggregate using a specified 
`FoldFunction`.
 
+* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value 
pairs into the state and
+retrieve an `Iterable` over all currently stored mappings. Mappings are added 
using `put(UK, UV)` or 
+`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved 
using `get(UK)`. The iterable
+views for mappings, keys and values can be retrieved using `entries()`, 
`keys()` and `values()` respectively.
+
 All types of state also have a method `clear()` that clears the state for the 
currently
 active key, i.e. the key of the input element.
 
@@ -136,7 +141,7 @@ To get a state handle, you have to create a 
`StateDescriptor`. This holds the na
 that you can reference them), the type of the values that the state holds, and 
possibly
 a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
 want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor`,
-a `ReducingStateDescriptor` or a `FoldingStateDescriptor`.
+a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a 
`MapStateDescriptor`.
 
 State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
 Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for
@@ -147,6 +152,7 @@ is available in a `RichFunction` has these methods for 
accessing state:
 * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
 * `ListState<T> getListState(ListStateDescriptor<T>)`
 * `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
+* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
 
 This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 89f41aa..569971a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -21,7 +21,10 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
@@ -50,7 +53,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                implements InternalKvState<N>, State {
 
        /** Serializer for the namespace */
-       private final TypeSerializer<N> namespaceSerializer;
+       final TypeSerializer<N> namespaceSerializer;
 
        /** The current namespace, which the next value methods will refer to */
        private N currentNamespace;
@@ -215,4 +218,48 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                        value >>>= 8;
                } while (value != 0);
        }
+       
+       protected Tuple3<Integer, K, N> 
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+               int keyGroup = readKeyGroup(inputView);
+               K key = readKey(inputStream, inputView);
+               N namespace = readNamespace(inputStream, inputView);
+
+               return new Tuple3<>(keyGroup, key, namespace);
+       }
+
+       private int readKeyGroup(DataInputView inputView) throws IOException {
+               int keyGroup = 0;
+               for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) {
+                       keyGroup <<= 8;
+                       keyGroup |= (inputView.readByte() & 0xFF);
+               }
+               return keyGroup;
+       }
+
+       private K readKey(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+               int beforeRead = inputStream.getPosition();
+               K key = backend.getKeySerializer().deserialize(inputView);
+               if (ambiguousKeyPossible) {
+                       int length = inputStream.getPosition() - beforeRead;
+                       readVariableIntBytes(inputView, length);
+               }
+               return key;
+       }
+
+       private N readNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+               int beforeRead = inputStream.getPosition();
+               N namespace = namespaceSerializer.deserialize(inputView);
+               if (ambiguousKeyPossible) {
+                       int length = inputStream.getPosition() - beforeRead;
+                       readVariableIntBytes(inputView, length);
+               }
+               return namespace;
+       }
+
+       private void readVariableIntBytes(DataInputView inputView, int value) 
throws IOException {
+               do {
+                       inputView.readByte();
+                       value >>>= 8;
+               } while (value != 0);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d8d77b6..a0efe78 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -53,6 +54,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -882,6 +884,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return new RocksDBFoldingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
        }
 
+       @Override
+       protected <N, UK, UV> InternalMapState<N, UK, UV> 
createMapState(TypeSerializer<N> namespaceSerializer,
+                       MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
+
+               return new RocksDBMapState<>(columnFamily, namespaceSerializer, 
stateDesc, this);
+       }
+
        /**
         * Wraps a RocksDB iterator to cache it's current key and assign an id 
for the key/value state to the iterator.
         * Used by #MergeIterator.

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
new file mode 100644
index 0000000..e9e9d9b
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ * <p>
+ * <p>{@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we use 
for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param <K>  The type of the key.
+ * @param <N>  The type of the namespace.
+ * @param <UK> The type of the keys in the map state.
+ * @param <UV> The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+       extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+       implements InternalMapState<N, UK, UV> {
+       
+       private static Logger LOG = 
LoggerFactory.getLogger(RocksDBMapState.class);
+
+       /** Serializer for the keys and values */
+       private final TypeSerializer<UK> userKeySerializer;
+       private final TypeSerializer<UV> userValueSerializer;
+
+       /**
+        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
+        * because JNI segfaults for some reason if they are.
+        */
+       private final WriteOptions writeOptions;
+
+       /**
+        * Creates a new {@code RocksDBMapState}.
+        *
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state.
+        */
+       public RocksDBMapState(ColumnFamilyHandle columnFamily,
+                       TypeSerializer<N> namespaceSerializer,
+                       MapStateDescriptor<UK, UV> stateDesc,
+                       RocksDBKeyedStateBackend<K> backend) {
+
+               super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+               this.userKeySerializer = stateDesc.getKeySerializer();
+               this.userValueSerializer = stateDesc.getValueSerializer();
+
+               writeOptions = new WriteOptions();
+               writeOptions.setDisableWAL(true);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  MapState Implementation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public UV get(UK userKey) throws IOException, RocksDBException {
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+               return (rawValueBytes == null ? null : 
deserializeUserValue(rawValueBytes));
+       }
+
+       @Override
+       public void put(UK userKey, UV userValue) throws IOException, 
RocksDBException {
+               
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawValueBytes = serializeUserValue(userValue);
+
+               backend.db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
+       }
+       
+       @Override
+       public void putAll(Map<UK, UV> map) throws IOException, 
RocksDBException {
+               if (map == null) {
+                       return;
+               }
+               
+               for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                       put(entry.getKey(), entry.getValue());
+               }
+       }
+
+       @Override
+       public void remove(UK userKey) throws IOException, RocksDBException {
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+
+               backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
+       }
+
+       @Override
+       public boolean contains(UK userKey) throws IOException, 
RocksDBException {
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+               return (rawValueBytes != null);
+       }
+
+       @Override
+       public int size() throws IOException, RocksDBException {
+               Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+               int count = 0;
+               while (iterator.hasNext()) {
+                       count++;
+                       iterator.next();
+               }
+
+               return count;
+       }
+       
+       @Override
+       public Iterable<Map.Entry<UK, UV>> entries() throws IOException, 
RocksDBException {
+               final Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+               // Return null to make the behavior consistent with other 
states.
+               if (!iterator.hasNext()) {
+                       return null;
+               } else {
+                       return new Iterable<Map.Entry<UK, UV>>() {
+                               @Override
+                               public Iterator<Map.Entry<UK, UV>> iterator() {
+                                       return iterator;
+                               }
+                       };
+               }
+       }
+
+       @Override
+       public Iterable<UK> keys() throws IOException, RocksDBException {
+               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+               
+               return new Iterable<UK>() {
+                       @Override
+                       public Iterator<UK> iterator() {
+                               return new RocksDBMapIterator<UK>(backend.db, 
prefixBytes) {
+                                       @Override
+                                       public UK next() {
+                                               RocksDBMapEntry entry = 
nextEntry();
+                                               return (entry == null ? null : 
entry.getKey());
+                                       }
+                               };
+                       }
+               };
+       }
+
+       @Override
+       public Iterable<UV> values() throws IOException, RocksDBException {
+               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+               
+               return new Iterable<UV>() {
+                       @Override
+                       public Iterator<UV> iterator() {
+                               return new RocksDBMapIterator<UV>(backend.db, 
prefixBytes) {
+                                       @Override
+                                       public UV next() {
+                                               RocksDBMapEntry entry = 
nextEntry();
+                                               return (entry == null ? null : 
entry.getValue());
+                                       }
+                               };
+                       }
+               };
+       }
+
+       @Override
+       public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, 
RocksDBException {
+               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+
+               return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, 
prefixBytes) {
+                       @Override
+                       public Map.Entry<UK, UV> next() {
+                               return nextEntry();
+                       }
+               };
+       }
+       
+       @Override
+       public void clear() {
+               try {
+                       Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+                       while (iterator.hasNext()) {
+                               iterator.next();
+                               iterator.remove();
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Error while cleaning the state.", e);
+               }
+       }
+       
+       @Override
+       @SuppressWarnings("unchecked")
+       public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
+               Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
+
+               //TODO make KvStateRequestSerializer key-group aware to save 
this round trip and key-group computation
+               Tuple2<K, N> des = 
KvStateRequestSerializer.deserializeKeyAndNamespace(
+                               serializedKeyAndNamespace,
+                               backend.getKeySerializer(),
+                               namespaceSerializer);
+
+               int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
+               
+               ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(128);
+               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
+               writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, 
outputStream, outputView);
+               final byte[] keyPrefixBytes = outputStream.toByteArray();
+
+               final Iterator<Map.Entry<UK, UV>> iterator = new 
RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) {
+                       @Override
+                       public Map.Entry<UK, UV> next() {
+                               return nextEntry();
+                       }
+               };
+
+               // Return null to make the behavior consistent with other 
backends
+               if (!iterator.hasNext()) {
+                       return null;
+               }
+               
+               return KvStateRequestSerializer.serializeMap(new 
Iterable<Map.Entry<UK, UV>>() {
+                       @Override
+                       public Iterator<Map.Entry<UK, UV>> iterator() {
+                               return iterator;
+                       }
+               }, userKeySerializer, userValueSerializer);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Serialization Methods
+       // 
------------------------------------------------------------------------
+       
+       private byte[] serializeCurrentKeyAndNamespace() throws IOException {
+               writeCurrentKeyWithGroupAndNamespace();
+               
+               return keySerializationStream.toByteArray();
+       }
+
+       private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) 
throws IOException {
+               writeCurrentKeyWithGroupAndNamespace();
+               userKeySerializer.serialize(userKey, 
keySerializationDataOutputView);
+               
+               return keySerializationStream.toByteArray();
+       }
+
+       private byte[] serializeUserValue(UV userValue) throws IOException {
+               keySerializationStream.reset();
+
+               if (userValue == null) {
+                       keySerializationDataOutputView.writeBoolean(true);
+               } else {
+                       keySerializationDataOutputView.writeBoolean(false);
+                       userValueSerializer.serialize(userValue, 
keySerializationDataOutputView);
+               }
+
+               
+               return keySerializationStream.toByteArray();
+       }
+
+       private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException {
+               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawKeyBytes);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+
+               readKeyWithGroupAndNamespace(bais, in);
+       
+               return userKeySerializer.deserialize(in);
+       }
+
+       private UV deserializeUserValue(byte[] rawValueBytes) throws 
IOException {
+               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawValueBytes);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+
+               boolean isNull = in.readBoolean();
+
+               return isNull ? null : userValueSerializer.deserialize(in);
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Internal Classes
+       // 
------------------------------------------------------------------------
+       
+       /** A map entry in RocksDBMapState */
+       private class RocksDBMapEntry implements Map.Entry<UK, UV> {
+               private final RocksDB db;
+               
+               /** The raw bytes of the key stored in RocksDB. Each user key 
is stored in RocksDB
+                * with the format #KeyGroup#Key#Namespace#UserKey. */
+               private final byte[] rawKeyBytes;
+               
+               /** The raw bytes of the value stored in RocksDB */
+               private byte[] rawValueBytes;
+
+               /** True if the entry has been deleted. */
+               private boolean deleted;
+
+               /** The user key and value. The deserialization is performed 
lazily, i.e. the key
+                * and the value is deserialized only when they are accessed. */
+               private UK userKey = null;
+               private UV userValue = null;
+
+               RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, 
final byte[] rawValueBytes) {
+                       this.db = db;
+                       
+                       this.rawKeyBytes = rawKeyBytes;
+                       this.rawValueBytes = rawValueBytes;
+                       this.deleted = false;
+               }
+
+               public void remove() {
+                       deleted = true;
+                       rawValueBytes = null;
+
+                       try {
+                               db.remove(columnFamily, writeOptions, 
rawKeyBytes);
+                       } catch (RocksDBException e) {
+                               throw new RuntimeException("Error while 
removing data from RocksDB.", e);
+                       }
+               }
+
+               @Override
+               public UK getKey() {
+                       if (userKey == null) {
+                               try {
+                                       userKey = 
deserializeUserKey(rawKeyBytes);
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Error while 
deserializing the user key.");
+                               }
+                       }
+
+                       return userKey;
+               }
+
+               @Override
+               public UV getValue() {
+                       if (deleted) {
+                               return null;
+                       } else {
+                               if (userValue == null) {
+                                       try {
+                                               userValue = 
deserializeUserValue(rawValueBytes);
+                                       } catch (IOException e) {
+                                               throw new 
RuntimeException("Error while deserializing the user value.");
+                                       }
+                               }
+
+                               return userValue;
+                       }
+               }
+
+               @Override
+               public UV setValue(UV value) {
+                       if (deleted) {
+                               throw new IllegalStateException("The value has 
already been deleted.");
+                       }
+
+                       UV oldValue = getValue();
+                       
+                       try {
+                               userValue = value;
+                               rawValueBytes = serializeUserValue(value);
+
+                               db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
+                       } catch (IOException | RocksDBException e) {
+                               throw new RuntimeException("Error while putting 
data into RocksDB.", e);
+                       }
+
+                       return oldValue;
+               }
+       }
+
+       /** An auxiliary utility to scan all entries under the given key. */
+       private abstract class RocksDBMapIterator<T> implements Iterator<T> {
+
+               final static int CACHE_SIZE_BASE = 1;
+               final static int CACHE_SIZE_LIMIT = 128;
+
+               /** The db where data resides. */
+               private final RocksDB db;
+
+               /** 
+                * The prefix bytes of the key being accessed. All entries 
under the same key
+                * has the same prefix, hence we can stop the iterating once 
coming across an
+                * entry with a different prefix. 
+                */
+               private final byte[] keyPrefixBytes;
+
+               /**
+                * True if all entries have been accessed or the iterator has 
come across an
+                * entry with a different prefix. 
+                */
+               private boolean expired = false;
+
+               /** A in-memory cache for the entries in the rocksdb. */
+               private ArrayList<RocksDBMapEntry> cacheEntries = new 
ArrayList<>();
+               private int cacheIndex = 0;
+
+
+               RocksDBMapIterator(final RocksDB db, final byte[] 
keyPrefixBytes) {
+                       this.db = db;
+                       this.keyPrefixBytes = keyPrefixBytes;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       loadCache();
+
+                       return (cacheIndex < cacheEntries.size());
+               }
+
+               @Override
+               public void remove() {
+                       if (cacheIndex == 0 || cacheIndex > 
cacheEntries.size()) {
+                               throw new IllegalStateException("The remove 
operation must be called after an valid next operation.");
+                       } 
+
+                       RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex 
- 1);
+                       lastEntry.remove();
+               }
+
+               final RocksDBMapEntry nextEntry() {
+                       loadCache();
+
+                       if (cacheIndex == cacheEntries.size()) {
+                               if (!expired) {
+                                       throw new IllegalStateException();
+                               }
+
+                               return null;
+                       }
+
+                       RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+                       cacheIndex++;
+
+                       return entry;
+               }
+
+               private void loadCache() {
+                       if (cacheIndex > cacheEntries.size()) {
+                               throw new IllegalStateException();
+                       }
+
+                       // Load cache entries only when the cache is empty and 
there still exist unread entries
+                       if (cacheIndex < cacheEntries.size() || expired) {
+                               return;
+                       }
+
+                       RocksIterator iterator = db.newIterator(columnFamily);
+
+                       /*
+                        * The iteration starts from the prefix bytes at the 
first loading. The cache then is
+                        * reloaded when the next entry to return is the last 
one in the cache. At that time,
+                        * we will start the iterating from the last returned 
entry.
+                        */
+                       RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? 
null : cacheEntries.get(cacheEntries.size() - 1);
+                       byte[] startBytes = (lastEntry == null ? keyPrefixBytes 
: lastEntry.rawKeyBytes);
+                       int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : 
Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT));
+
+                       cacheEntries.clear();
+                       cacheIndex = 0;
+
+                       iterator.seek(startBytes);
+
+                       /* 
+                        * If the last returned entry is not deleted, it will 
be the first entry in the
+                        * iterating. Skip it to avoid redundant access in such 
cases.
+                        */
+                       if (lastEntry != null && !lastEntry.deleted) {
+                               iterator.next();
+                       }
+
+                       while (true) {
+                               if (!iterator.isValid() || 
!underSameKey(iterator.key())) {
+                                       expired = true;
+                                       break;
+                               }
+
+                               if (cacheEntries.size() >= numEntries) {
+                                       break;
+                               }
+
+                               RocksDBMapEntry entry = new RocksDBMapEntry(db, 
iterator.key(), iterator.value());
+                               cacheEntries.add(entry);
+
+                               iterator.next();
+                       }
+
+                       iterator.close();
+               }
+               
+               private boolean underSameKey(byte[] rawKeyBytes) {
+                       if (rawKeyBytes.length < keyPrefixBytes.length) {
+                               return false;
+                       }
+
+                       for (int i = 0; i < keyPrefixBytes.length; ++i) {
+                               if (rawKeyBytes[i] != keyPrefixBytes[i]) {
+                                       return false;
+                               }
+                       }
+
+                       return true;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 405e390..98ad018 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -31,6 +31,8 @@ import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -387,4 +389,44 @@ public interface RuntimeContext {
         */
        @PublicEvolving
        <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, 
ACC> stateProperties);
+       
+       /**
+        * Gets a handle to the system's key/value map state. This state is 
similar to the state
+        * accessed via {@link #getState(ValueStateDescriptor)}, but is 
optimized for state that
+        * is composed of user-defined key-value pairs
+        *
+        * <p>This state is only accessible if the function is executed on a 
KeyedStream.
+        *
+        * <pre>{@code
+        * DataStream<MyType> stream = ...;
+        * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+        *
+        * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+        *
+        *     private MapState<MyType, Long> state;
+        *
+        *     public void open(Configuration cfg) {
+        *         state = getRuntimeContext().getMapState(
+        *                 new MapStateDescriptor<>("sum", MyType.class, 
Long.class));
+        *     }
+        *
+        *     public Tuple2<MyType, Long> map(MyType value) {
+        *         return new Tuple2<>(value, state.get(value));
+        *     }
+        * });
+        *
+        * }</pre>
+        *
+        * @param stateProperties The descriptor defining the properties of the 
stats.
+        *
+        * @param <UK> The type of the user keys stored in the state.
+        * @param <UV> The type of the user values stored in the state.
+        *
+        * @return The partitioned state object.
+        *
+        * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
+        *                                       function (function is not part 
of a KeyedStream).
+        */
+       @PublicEvolving
+       <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 0eafeaa..2538799 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -214,4 +216,11 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
                throw new UnsupportedOperationException(
                                "This state is only accessible by functions 
executed on a KeyedStream");
        }
+       
+       @Override
+       @PublicEvolving
+       public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties) {
+               throw new UnsupportedOperationException(
+                               "This state is only accessible by functions 
executed on a KeyedStream");
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index bbb4c67..2187f6c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -196,4 +196,44 @@ public interface KeyedStateStore {
         */
        @PublicEvolving
        <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, 
ACC> stateProperties);
+
+       /**
+        * Gets a handle to the system's key/value map state. This state is 
similar to the state
+        * accessed via {@link #getState(ValueStateDescriptor)}, but is 
optimized for state that
+        * is composed of user-defined key-value pairs
+        *
+        * <p>This state is only accessible if the function is executed on a 
KeyedStream.
+        *
+        * <pre>{@code
+        * DataStream<MyType> stream = ...;
+        * KeyedStream<MyType> keyedStream = stream.keyBy("id");
+        *
+        * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() {
+        *
+        *     private MapState<MyType, Long> state;
+        *
+        *     public void open(Configuration cfg) {
+        *         state = getRuntimeContext().getMapState(
+        *                 new MapStateDescriptor<>("sum", MyType.class, 
Long.class));
+        *     }
+        *
+        *     public Tuple2<MyType, Long> map(MyType value) {
+        *         return new Tuple2<>(value, state.get(value));
+        *     }
+        * });
+        *
+        * }</pre>
+        *
+        * @param stateProperties The descriptor defining the properties of the 
stats.
+        *
+        * @param <UK> The type of the user keys stored in the state.
+        * @param <UV> The type of the user values stored in the state.
+        *
+        * @return The partitioned state object.
+        *
+        * @throws UnsupportedOperationException Thrown, if no partitioned 
state is available for the
+        *                                       function (function is not part 
of a KeyedStream).
+        */
+       @PublicEvolving
+       <UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
new file mode 100644
index 0000000..fa657ef
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link State} interface for partitioned key-value state. The key-value pair 
can be
+ * added, updated and retrieved.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed 
consistently
+ * by the system as part of the distributed snapshots.
+ *
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. 
The key is
+ * automatically supplied by the system, so the function always sees the value 
mapped to the
+ * key of the current element. That way, the system can handle stream and 
state partitioning
+ * consistently together.
+ *
+ * @param <UK> Type of the keys in the state.
+ * @param <UV> Type of the values in the state.
+ */
+@PublicEvolving
+public interface MapState<UK, UV> extends State {
+
+       /**
+        * Returns the current value associated with the given key.
+        *
+        * @param key The key of the mapping
+        * @return The value of the mapping with the given key
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       UV get(UK key) throws Exception;
+
+       /**
+        * Associates a new value with the given key.
+        *
+        * @param key The key of the mapping
+        * @param value The new value of the mapping
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       void put(UK key, UV value) throws Exception;
+       
+       /**
+        * Copies all of the mappings from the given map into the state.
+        *
+        * @param map The mappings to be stored in this state
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       void putAll(Map<UK, UV> map) throws Exception;
+
+       /**
+        * Deletes the mapping of the given key.
+        *
+        * @param key The key of the mapping
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       void remove(UK key) throws Exception;
+
+       /**
+        * Returns whether there exists the given mapping.
+        *
+        * @param key The key of the mapping
+        * @return True if there exists a mapping whose key equals to the given 
key
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       boolean contains(UK key) throws Exception;
+
+       /**
+        * @return The number of mappings in the state.
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       int size() throws Exception;
+
+       /**
+        * Returns all the mappings in the state
+        *
+        * @return An iterable view of all the key-value pairs in the state.
+        * 
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       Iterable<Map.Entry<UK, UV>> entries() throws Exception;
+       
+       /**
+        * Returns all the keys in the state
+        *
+        * @return An iterable view of all the keys in the state.
+        * 
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       Iterable<UK> keys() throws Exception;
+
+       /**
+        * Returns all the values in the state.
+        *
+        * @return An iterable view of all the values in the state.
+        * 
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       Iterable<UV> values() throws Exception;
+
+       /**
+        * Iterates over all the mappings in the state.
+        *
+        * @return An iterator over all the mappings in the state
+        * 
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
new file mode 100644
index 0000000..d4a49f8
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+
+import java.util.Map;
+
+/**
+ * A {@link StateDescriptor} for {@link MapState}. This can be used to create 
state where the type
+ * is a map that can be updated and iterated over.
+ * 
+ * <p>Using {@code MapState} is typically more efficient than manually 
maintaining a map in a
+ * {@link ValueState}, because the backing implementation can support 
efficient updates, rather then
+ * replacing the full map on write.
+ * 
+ * <p>To create keyed map state (on a KeyedStream), use 
+ * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
+ *
+ * @param <UK> The type of the keys that can be added to the map state.
+ */
+@PublicEvolving
+public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, 
UV>, Map<UK, UV>> {
+
+       /**
+        * Create a new {@code MapStateDescriptor} with the given name and the 
given type serializers.
+        *
+        * @param name The name of the {@code MapStateDescriptor}.
+        * @param keySerializer The type serializer for the keys in the state.
+        * @param valueSerializer The type serializer for the values in the 
state.
+        */
+       public MapStateDescriptor(String name, TypeSerializer<UK> 
keySerializer, TypeSerializer<UV> valueSerializer) {
+               super(name, new MapSerializer<>(keySerializer, 
valueSerializer), null);
+       }
+
+       /**
+        * Create a new {@code MapStateDescriptor} with the given name and the 
given type informations.
+        *
+        * @param name The name of the {@code MapStateDescriptor}.
+        * @param keyTypeInfo The type information for the keys in the state.
+        * @param valueTypeInfo The type information for the values in the 
state.
+        */
+       public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, 
TypeInformation<UV> valueTypeInfo) {
+               super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), 
null);
+       }
+
+       /**
+        * Create a new {@code MapStateDescriptor} with the given name and the 
given type information.
+        *
+        * <p>If this constructor fails (because it is not possible to describe 
the type via a class),
+        * consider using the {@link #MapStateDescriptor(String, 
TypeInformation, TypeInformation)} constructor.
+        *
+        * @param name The name of the {@code MapStateDescriptor}.
+        * @param keyClass The class of the type of keys in the state.
+        * @param valueClass The class of the type of values in the state.
+        */
+       public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> 
valueClass) {
+               super(name, new MapTypeInfo<>(keyClass, valueClass), null);
+       }
+
+       @Override
+       public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception {
+               return stateBinder.createMapState(this);
+       }
+
+       @Override
+       public Type getType() {
+               return Type.MAP;
+       }
+
+       /**
+        * Gets the serializer for the keys in the state.
+        * 
+        * @return The serializer for the keys in the state.
+        */
+       public TypeSerializer<UK> getKeySerializer() {
+               final TypeSerializer<Map<UK, UV>> rawSerializer = 
getSerializer();
+               if (!(rawSerializer instanceof MapSerializer)) {
+                       throw new IllegalStateException("Unexpected serializer 
type.");
+               }
+
+               return ((MapSerializer<UK, UV>) 
rawSerializer).getKeySerializer();
+       }
+
+       /**
+        * Gets the serializer for the values in the state.
+        *
+        * @return The serializer for the values in the state.
+        */
+       public TypeSerializer<UV> getValueSerializer() {
+               final TypeSerializer<Map<UK, UV>> rawSerializer = 
getSerializer();
+               if (!(rawSerializer instanceof MapSerializer)) {
+                       throw new IllegalStateException("Unexpected serializer 
type.");
+               }
+
+               return ((MapSerializer<UK, UV>) 
rawSerializer).getValueSerializer();
+       }
+       
+       @Override
+       public int hashCode() {
+               int result = serializer.hashCode();
+               result = 31 * result + name.hashCode();
+               return result;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o;
+               return serializer.equals(that.serializer) && 
name.equals(that.name);
+       }
+
+       @Override
+       public String toString() {
+               return "MapStateDescriptor{" +
+                               "name=" + name +
+                               ", serializer=" + serializer +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
index 08dfc90..9df7a47 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -70,4 +70,13 @@ public interface StateBinder {
         * @param <ACC> Type of the value in the state
         */
        <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+       /**
+        * Creates and returns a new {@link MapState}.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <MK> Type of the keys in the state
+        * @param <MV> Type of the values in the state
+        */
+       <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> 
stateDesc) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 332e649..a52ea32 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -55,7 +55,7 @@ public abstract class StateDescriptor<S extends State, T> 
implements Serializabl
         */
        // IMPORTANT: Do not change the order of the elements in this enum, 
ordinal is used in serialization
        public enum Type {
-               @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING
+               @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, 
AGGREGATING, MAP
        }
 
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
new file mode 100644
index 0000000..5e1a3bf
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * A serializer for {@link Map}. The serializer relies on a key serializer and 
a value serializer
+ * for the serialization of the map's key-value pairs.
+ *
+ * <p>The serialization format for the map is as follows: four bytes for the 
length of the map,
+ * followed by the serialized representation of each key-value pair. To allow 
null values, each value
+ * is prefixed by a null marker.
+ *
+ * @param <K> The type of the keys in the map.
+ * @param <V> The type of the values in the map.
+ */
+public class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
+
+       private static final long serialVersionUID = -6885593032367050078L;
+       
+       /** The serializer for the keys in the map */
+       private final TypeSerializer<K> keySerializer;
+
+       /** The serializer for the values in the map */
+       private final TypeSerializer<V> valueSerializer;
+
+       /**
+        * Creates a map serializer that uses the given serializers to 
serialize the key-value pairs in the map.
+        *
+        * @param keySerializer The serializer for the keys in the map
+        * @param valueSerializer The serializer for the values in the map
+        */
+       public MapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> 
valueSerializer) {
+               this.keySerializer = Preconditions.checkNotNull(keySerializer, 
"The key serializer cannot be null");
+               this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be 
null.");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  MapSerializer specific properties
+       // 
------------------------------------------------------------------------
+
+       public TypeSerializer<K> getKeySerializer() {
+               return keySerializer;
+       }
+
+       public TypeSerializer<V> getValueSerializer() {
+               return valueSerializer;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Type Serializer implementation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<Map<K, V>> duplicate() {
+               TypeSerializer<K> duplicateKeySerializer = 
keySerializer.duplicate();
+               TypeSerializer<V> duplicateValueSerializer = 
valueSerializer.duplicate();
+
+               return new MapSerializer<>(duplicateKeySerializer, 
duplicateValueSerializer);
+       }
+
+       @Override
+       public Map<K, V> createInstance() {
+               return new HashMap<>();
+       }
+
+       @Override
+       public Map<K, V> copy(Map<K, V> from) {
+               Map<K, V> newMap = new HashMap<>(from.size());
+
+               for (Map.Entry<K, V> entry : from.entrySet()) {
+                       K newKey = keySerializer.copy(entry.getKey());
+                       V newValue = entry.getValue() == null ? null : 
valueSerializer.copy(entry.getValue());
+
+                       newMap.put(newKey, newValue);
+               }
+
+               return newMap;
+       }
+
+       @Override
+       public Map<K, V> copy(Map<K, V> from, Map<K, V> reuse) {
+               return copy(from);
+       }
+
+       @Override
+       public int getLength() {
+               return -1; // var length
+       }
+
+       @Override
+       public void serialize(Map<K, V> map, DataOutputView target) throws 
IOException {
+               final int size = map.size();
+               target.writeInt(size);
+
+               for (Map.Entry<K, V> entry : map.entrySet()) {
+                       keySerializer.serialize(entry.getKey(), target);
+
+                       if (entry.getValue() == null) {
+                               target.writeBoolean(true);
+                       } else {
+                               target.writeBoolean(false);
+                               valueSerializer.serialize(entry.getValue(), 
target);
+                       }
+               }
+       }
+
+       @Override
+       public Map<K, V> deserialize(DataInputView source) throws IOException {
+               final int size = source.readInt();
+
+               final Map<K, V> map = new HashMap<>(size);
+               for (int i = 0; i < size; ++i) {
+                       K key = keySerializer.deserialize(source);
+
+                       boolean isNull = source.readBoolean();
+                       V value = isNull ? null : 
valueSerializer.deserialize(source);
+
+                       map.put(key, value);
+               }
+
+               return map;
+       }
+
+       @Override
+       public Map<K, V> deserialize(Map<K, V> reuse, DataInputView source) 
throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               final int size = source.readInt();
+               target.writeInt(size);
+
+               for (int i = 0; i < size; ++i) {
+                       keySerializer.copy(source, target);
+                       
+                       boolean isNull = source.readBoolean();
+                       target.writeBoolean(isNull);
+                       
+                       if (!isNull) {
+                               valueSerializer.copy(source, target);
+                       }
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj == this ||
+                               (obj != null && obj.getClass() == getClass() &&
+                                               
keySerializer.equals(((MapSerializer<?, ?>) obj).getKeySerializer()) &&
+                                               
valueSerializer.equals(((MapSerializer<?, ?>) obj).getValueSerializer()));
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return (obj != null && obj.getClass() == getClass());
+       }
+
+       @Override
+       public int hashCode() {
+               return keySerializer.hashCode() * 31 + 
valueSerializer.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java
new file mode 100644
index 0000000..ca04e0c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Special {@code TypeInformation} used by {@link 
org.apache.flink.api.common.state.MapStateDescriptor}.
+ * 
+ * @param <K> The type of the keys in the map.
+ * @param <V> The type of the values in the map.
+ */
+@PublicEvolving
+public class MapTypeInfo<K, V> extends TypeInformation<Map<K, V>> {
+
+       /* The type information for the keys in the map*/
+       private final TypeInformation<K> keyTypeInfo;
+
+       /* The type information for the values in the map */
+       private final TypeInformation<V> valueTypeInfo;
+
+       public MapTypeInfo(TypeInformation<K> keyTypeInfo, TypeInformation<V> 
valueTypeInfo) {
+               this.keyTypeInfo = Preconditions.checkNotNull(keyTypeInfo, "The 
key type information cannot be null.");
+               this.valueTypeInfo = Preconditions.checkNotNull(valueTypeInfo, 
"The value type information cannot be null.");
+       }
+       
+       public MapTypeInfo(Class<K> keyClass, Class<V> valueClass) {
+               this.keyTypeInfo = of(checkNotNull(keyClass, "The key class 
cannot be null."));
+               this.valueTypeInfo = of(checkNotNull(valueClass, "The value 
class cannot be null."));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  MapTypeInfo specific properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the type information for the keys in the map
+        */
+       public TypeInformation<K> getKeyTypeInfo() {
+               return keyTypeInfo;
+       }
+
+       /**
+        * Gets the type information for the values in the map
+        */
+       public TypeInformation<V> getValueTypeInfo() {
+               return valueTypeInfo;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  TypeInformation implementation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isBasicType() {
+               return false;
+       }
+
+       @Override
+       public boolean isTupleType() {
+               return false;
+       }
+
+       @Override
+       public int getArity() {
+               return 0;
+       }
+
+       @Override
+       public int getTotalFields() {
+               return 2;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public Class<Map<K, V>> getTypeClass() {
+               return (Class<Map<K, V>>)(Class<?>)Map.class;
+       }
+
+       @Override
+       public boolean isKeyType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<Map<K, V>> createSerializer(ExecutionConfig 
config) {
+               TypeSerializer<K> keyTypeSerializer = 
keyTypeInfo.createSerializer(config);
+               TypeSerializer<V> valueTypeSerializer = 
valueTypeInfo.createSerializer(config);
+               
+               return new MapSerializer<>(keyTypeSerializer, 
valueTypeSerializer);
+       }
+
+       @Override
+       public String toString() {
+               return "Map<" + keyTypeInfo + ", " + valueTypeInfo + ">";
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj instanceof MapTypeInfo) {
+                       @SuppressWarnings("unchecked")
+                       MapTypeInfo<K, V> other = (MapTypeInfo<K, V>) obj;
+                       
+                       return (other.canEqual(this) && 
+                                       keyTypeInfo.equals(other.keyTypeInfo) 
&& valueTypeInfo.equals(other.valueTypeInfo));
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * keyTypeInfo.hashCode() + valueTypeInfo.hashCode();
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return (obj != null && obj.getClass() == getClass());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
new file mode 100644
index 0000000..9d1b105
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MapStateDescriptorTest {
+       
+       @Test
+       public void testMapStateDescriptorEagerSerializer() throws Exception {
+
+               TypeSerializer<Integer> keySerializer = new 
KryoSerializer<>(Integer.class, new ExecutionConfig());
+               TypeSerializer<String> valueSerializer = new 
KryoSerializer<>(String.class, new ExecutionConfig());
+               
+               MapStateDescriptor<Integer, String> descr = 
+                               new MapStateDescriptor<>("testName", 
keySerializer, valueSerializer);
+               
+               assertEquals("testName", descr.getName());
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof MapSerializer);
+               assertNotNull(descr.getKeySerializer());
+               assertEquals(keySerializer, descr.getKeySerializer());
+               assertNotNull(descr.getValueSerializer());
+               assertEquals(valueSerializer, descr.getValueSerializer());
+
+               MapStateDescriptor<Integer, String> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+               assertNotNull(copy.getSerializer());
+               assertTrue(copy.getSerializer() instanceof MapSerializer);
+
+               assertNotNull(copy.getKeySerializer());
+               assertEquals(keySerializer, copy.getKeySerializer());
+               assertNotNull(copy.getValueSerializer());
+               assertEquals(valueSerializer, copy.getValueSerializer());
+       }
+
+       @Test
+       public void testMapStateDescriptorLazySerializer() throws Exception {
+               // some different registered value
+               ExecutionConfig cfg = new ExecutionConfig();
+               cfg.registerKryoType(TaskInfo.class);
+
+               MapStateDescriptor<Path, String> descr =
+                               new MapStateDescriptor<>("testName", 
Path.class, String.class);
+               
+               try {
+                       descr.getSerializer();
+                       fail("should cause an exception");
+               } catch (IllegalStateException ignored) {}
+
+               descr.initializeSerializerUnlessSet(cfg);
+
+               assertNotNull(descr.getSerializer());
+               assertTrue(descr.getSerializer() instanceof MapSerializer);
+
+               assertNotNull(descr.getKeySerializer());
+               assertTrue(descr.getKeySerializer() instanceof KryoSerializer);
+
+               assertTrue(((KryoSerializer<?>) 
descr.getKeySerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 
0);
+               
+               assertNotNull(descr.getValueSerializer());
+               assertTrue(descr.getValueSerializer() instanceof 
StringSerializer);
+       }
+
+       @Test
+       public void testMapStateDescriptorAutoSerializer() throws Exception {
+
+               MapStateDescriptor<String, Long> descr =
+                               new MapStateDescriptor<>("testName", 
String.class, Long.class);
+
+               MapStateDescriptor<String, Long> copy = 
CommonTestUtils.createCopySerializable(descr);
+
+               assertEquals("testName", copy.getName());
+
+               assertNotNull(copy.getSerializer());
+               assertTrue(copy.getSerializer() instanceof MapSerializer);
+
+               assertNotNull(copy.getKeySerializer());
+               assertEquals(StringSerializer.INSTANCE, 
copy.getKeySerializer());
+               assertNotNull(copy.getValueSerializer());
+               assertEquals(LongSerializer.INSTANCE, 
copy.getValueSerializer());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java
new file mode 100644
index 0000000..9ce7de1
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+/**
+ * A test for the {@link MapSerializer}.
+ */
+public class MapSerializerTest extends SerializerTestBase<Map<Long, String>> {
+
+       @Override
+       protected TypeSerializer<Map<Long, String>> createSerializer() {
+               return new MapSerializer<>(LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected Class<Map<Long, String>> getTypeClass() {
+               return (Class<Map<Long, String>>) (Class<?>) Map.class;
+       }
+
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       @Override
+       protected Map<Long, String>[] getTestData() {
+               final Random rnd = new Random(123654789);
+
+               // empty maps
+               final Map<Long, String> map1 = Collections.emptyMap(); 
+               final Map<Long, String> map2 = new HashMap<>();
+               final Map<Long, String> map3 = new TreeMap<>();
+
+               // single element maps
+               final Map<Long, String> map4 = Collections.singletonMap(0L, 
"hello");
+               final Map<Long, String> map5 = new HashMap<>();
+               map5.put(12345L, "12345L");
+               final Map<Long, String> map6 = new TreeMap<>();
+               map6.put(777888L, "777888L");
+
+               // longer maps
+               final Map<Long, String> map7 = new HashMap<>();
+               for (int i = 0; i < rnd.nextInt(200); i++) {
+                       map7.put(rnd.nextLong(), Long.toString(rnd.nextLong()));
+               }
+
+               final Map<Long, String> map8 = new TreeMap<>();
+               for (int i = 0; i < rnd.nextInt(200); i++) {
+                       map8.put(rnd.nextLong(), Long.toString(rnd.nextLong()));
+               }
+               
+               // null-value maps
+               final Map<Long, String> map9 = Collections.singletonMap(0L, 
null);
+               final Map<Long, String> map10 = new HashMap<>();
+               map10.put(999L, null);
+               final Map<Long, String> map11 = new TreeMap<>();
+               map11.put(666L, null);
+
+               return (Map<Long, String>[]) new Map[] {
+                               map1, map2, map3, map4, map5, map6, map7, map8, 
map9, map10, map11
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 109d152..7f8eea8 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -118,6 +118,10 @@ public class FileStateBackendTest extends 
StateBackendTestBase<FsStateBackend> {
        @Override
        @Test
        public void testReducingStateRestoreWithWrongSerializers() {}
+       
+       @Override
+       @Test
+       public void testMapStateRestoreWithWrongSerializers() {}
 
        @Test
        public void testStateOutputStream() {

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
index 2f32861..bc830e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java
@@ -36,7 +36,9 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Serialization and deserialization of messages exchanged between
@@ -484,6 +486,71 @@ public final class KvStateRequestSerializer {
                        return null;
                }
        }
+       
+       /**
+        * Serializes all values of the Iterable with the given serializer.
+        *
+        * @param entries         Key-value pairs to serialize
+        * @param keySerializer   Serializer for UK
+        * @param valueSerializer Serializer for UV
+        * @param <UK>            Type of the keys
+        * @param <UV>            Type of the values
+        * @return Serialized values or <code>null</code> if values 
<code>null</code> or empty
+        * @throws IOException On failure during serialization
+        */
+       public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> 
entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) 
throws IOException {
+               if (entries != null) {
+                       // Serialize
+                       DataOutputSerializer dos = new DataOutputSerializer(32);
+
+                       for (Map.Entry<UK, UV> entry : entries) {
+                               keySerializer.serialize(entry.getKey(), dos);
+
+                               if (entry.getValue() == null) {
+                                       dos.writeBoolean(true);
+                               } else {
+                                       dos.writeBoolean(false);
+                                       
valueSerializer.serialize(entry.getValue(), dos);
+                               }
+                       }
+
+                       return dos.getCopyOfBuffer();
+               } else {
+                       return null;
+               }
+       }
+       
+       /**
+        * Deserializes all kv pairs with the given serializer.
+        *
+        * @param serializedValue Serialized value of type Map<UK, UV>
+        * @param keySerializer   Serializer for UK
+        * @param valueSerializer Serializer for UV
+        * @param <UK>            Type of the key
+        * @param <UV>            Type of the value.
+        * @return Deserialized map or <code>null</code> if the serialized value
+        * is <code>null</code>
+        * @throws IOException On failure during deserialization
+        */
+       public static <UK, UV> Map<UK, UV> deserializeMap(byte[] 
serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> 
valueSerializer) throws IOException {
+               if (serializedValue != null) {
+                       DataInputDeserializer in = new 
DataInputDeserializer(serializedValue, 0, serializedValue.length);
+
+                       Map<UK, UV> result = new HashMap<>();
+                       while (in.available() > 0) {
+                               UK key = keySerializer.deserialize(in);
+
+                               boolean isNull = in.readBoolean();
+                               UV value = isNull ? null : 
valueSerializer.deserialize(in);
+
+                               result.put(key, value);
+                       }
+
+                       return result;
+               } else {
+                       return null;
+               }
+       }
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index fe5d1cc..3ed49f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
@@ -39,6 +41,7 @@ import 
org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.Preconditions;
@@ -189,6 +192,20 @@ public abstract class AbstractKeyedStateBackend<K>
                        FoldingStateDescriptor<T, ACC> stateDesc) throws 
Exception;
 
        /**
+        * Creates and returns a new {@link MapState}.
+        *
+        * @param namespaceSerializer TypeSerializer for the state namespace.
+        * @param stateDesc The {@code StateDescriptor} that contains the name 
of the state.
+        *
+        * @param <N> The type of the namespace.
+        * @param <UK> Type of the keys in the state
+        * @param <UV> Type of the values in the state   *
+        */
+       protected abstract <N, UK, UV> InternalMapState<N, UK, UV> 
createMapState(
+                       TypeSerializer<N> namespaceSerializer,
+                       MapStateDescriptor<UK, UV> stateDesc) throws Exception;
+
+       /**
         * @see KeyedStateBackend
         */
        @Override
@@ -285,12 +302,16 @@ public abstract class AbstractKeyedStateBackend<K>
                                        AggregatingStateDescriptor<T, ACC, R> 
stateDesc) throws Exception {
                                return 
AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, 
stateDesc);
                        }
-                       
 
                        @Override
                        public <T, ACC> FoldingState<T, ACC> 
createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
                                return 
AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, 
stateDesc);
                        }
+                       
+                       @Override
+                       public <UK, UV> MapState<UK, UV> 
createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+                               return 
AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
+                       }
 
                });
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index d8b8aa8..a32cebd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
@@ -93,6 +95,18 @@ public class DefaultKeyedStateStore implements 
KeyedStateStore {
                }
        }
 
+       @Override
+       public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties) {
+               requireNonNull(stateProperties, "The state properties must not 
be null");
+               try {
+                       
stateProperties.initializeSerializerUnlessSet(executionConfig);
+                       MapState<UK, UV> originalState = 
getPartitionedState(stateProperties);
+                       return new UserFacingMapState<>(originalState);
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while getting state", 
e);
+               }
+       }
+
        private <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor) throws Exception {
                return keyedStateBackend.getPartitionedState(
                                VoidNamespace.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
new file mode 100644
index 0000000..61cc58c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A serializer for {@link HashMap}. The serializer relies on a key serializer 
and a value serializer
+ * for the serialization of the map's key-value pairs.
+ *
+ * <p>The serialization format for the map is as follows: four bytes for the 
length of the map,
+ * followed by the serialized representation of each key-value pair. To allow 
null values, each value
+ * is prefixed by a null marker.
+ *
+ * @param <K> The type of the keys in the map.
+ * @param <V> The type of the values in the map.
+ */
+public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
+
+       private static final long serialVersionUID = -6885593032367050078L;
+       
+       /** The serializer for the keys in the map */
+       private final TypeSerializer<K> keySerializer;
+
+       /** The serializer for the values in the map */
+       private final TypeSerializer<V> valueSerializer;
+
+       /**
+        * Creates a map serializer that uses the given serializers to 
serialize the key-value pairs in the map.
+        *
+        * @param keySerializer The serializer for the keys in the map
+        * @param valueSerializer The serializer for the values in the map
+        */
+       public HashMapSerializer(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer) {
+               this.keySerializer = Preconditions.checkNotNull(keySerializer, 
"The key serializer cannot be null");
+               this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be 
null.");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  HashMapSerializer specific properties
+       // 
------------------------------------------------------------------------
+
+       public TypeSerializer<K> getKeySerializer() {
+               return keySerializer;
+       }
+
+       public TypeSerializer<V> getValueSerializer() {
+               return valueSerializer;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Type Serializer implementation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isImmutableType() {
+               return false;
+       }
+
+       @Override
+       public TypeSerializer<HashMap<K, V>> duplicate() {
+               TypeSerializer<K> duplicateKeySerializer = 
keySerializer.duplicate();
+               TypeSerializer<V> duplicateValueSerializer = 
valueSerializer.duplicate();
+
+               return new HashMapSerializer<>(duplicateKeySerializer, 
duplicateValueSerializer);
+       }
+
+       @Override
+       public HashMap<K, V> createInstance() {
+               return new HashMap<>();
+       }
+
+       @Override
+       public HashMap<K, V> copy(HashMap<K, V> from) {
+               HashMap<K, V> newHashMap = new HashMap<>(from.size());
+
+               for (Map.Entry<K, V> entry : from.entrySet()) {
+                       K newKey = keySerializer.copy(entry.getKey());
+                       V newValue = entry.getValue() == null ? null : 
valueSerializer.copy(entry.getValue());
+
+                       newHashMap.put(newKey, newValue);
+               }
+
+               return newHashMap;
+       }
+
+       @Override
+       public HashMap<K, V> copy(HashMap<K, V> from, HashMap<K, V> reuse) {
+               return copy(from);
+       }
+
+       @Override
+       public int getLength() {
+               return -1; // var length
+       }
+
+       @Override
+       public void serialize(HashMap<K, V> map, DataOutputView target) throws 
IOException {
+               final int size = map.size();
+               target.writeInt(size);
+
+               for (Map.Entry<K, V> entry : map.entrySet()) {
+                       keySerializer.serialize(entry.getKey(), target);
+
+                       if (entry.getValue() == null) {
+                               target.writeBoolean(true);
+                       } else {
+                               target.writeBoolean(false);
+                               valueSerializer.serialize(entry.getValue(), 
target);
+                       }
+               }
+       }
+
+       @Override
+       public HashMap<K, V> deserialize(DataInputView source) throws 
IOException {
+               final int size = source.readInt();
+
+               final HashMap<K, V> map = new HashMap<>(size);
+               for (int i = 0; i < size; ++i) {
+                       K key = keySerializer.deserialize(source);
+
+                       boolean isNull = source.readBoolean();
+                       V value = isNull ? null : 
valueSerializer.deserialize(source);
+
+                       map.put(key, value);
+               }
+
+               return map;
+       }
+
+       @Override
+       public HashMap<K, V> deserialize(HashMap<K, V> reuse, DataInputView 
source) throws IOException {
+               return deserialize(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               final int size = source.readInt();
+               target.writeInt(size);
+
+               for (int i = 0; i < size; ++i) {
+                       keySerializer.copy(source, target);
+                       
+                       boolean isNull = source.readBoolean();
+                       target.writeBoolean(isNull);
+                       
+                       if (!isNull) {
+                               valueSerializer.copy(source, target);
+                       }
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj == this ||
+                               (obj != null && obj.getClass() == getClass() &&
+                                               
keySerializer.equals(((HashMapSerializer<?, ?>) obj).getKeySerializer()) &&
+                                               
valueSerializer.equals(((HashMapSerializer<?, ?>) obj).getValueSerializer()));
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return (obj != null && obj.getClass() == getClass());
+       }
+
+       @Override
+       public int hashCode() {
+               return keySerializer.hashCode() * 31 + 
valueSerializer.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
new file mode 100644
index 0000000..6cddf6d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.MapState;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Simple wrapper map state that exposes empty state properly as an empty map.
+ * 
+ * @param <K> The type of keys in the map state.
+ * @param <V> The type of values in the map state.
+ */
+class UserFacingMapState<K, V> implements MapState<K, V> {
+
+       private final MapState<K, V> originalState;
+
+       private final Map<K, V> emptyState = Collections.<K, V>emptyMap();
+
+       UserFacingMapState(MapState<K, V> originalState) {
+               this.originalState = originalState;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public V get(K key) throws Exception {
+               return originalState.get(key);
+       }
+
+       @Override
+       public void put(K key, V value) throws Exception {
+               originalState.put(key, value);
+       }
+
+       @Override
+       public void putAll(Map<K, V> value) throws Exception {
+               originalState.putAll(value);
+       }
+
+       @Override
+       public void clear() {
+               originalState.clear();
+       }
+
+       @Override
+       public void remove(K key) throws Exception {
+               originalState.remove(key);
+       }
+
+       @Override
+       public boolean contains(K key) throws Exception {
+               return originalState.contains(key);
+       }
+
+       @Override
+       public int size() throws Exception {
+               return originalState.size();
+       }
+
+       @Override
+       public Iterable<Map.Entry<K, V>> entries() throws Exception {
+               Iterable<Map.Entry<K, V>> original = originalState.entries();
+               return original != null ? original : emptyState.entrySet();
+       }
+
+       @Override
+       public Iterable<K> keys() throws Exception {
+               Iterable<K> original = originalState.keys();
+               return original != null ? original : emptyState.keySet();
+       }
+
+       @Override
+       public Iterable<V> values() throws Exception {
+               Iterable<V> original = originalState.values();
+               return original != null ? original : emptyState.values();
+       }
+
+       @Override
+       public Iterator<Map.Entry<K, V>> iterator() throws Exception {
+               Iterator<Map.Entry<K, V>> original = originalState.iterator();
+               return original != null ? original : 
emptyState.entrySet().iterator();
+       }
+}

Reply via email to