[FLINK-4856] Add MapState for keyed state
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c9e2b6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c9e2b6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c9e2b6 Branch: refs/heads/master Commit: 30c9e2b683bf7f4776ffc23b6a860946a4429ae5 Parents: de2605e Author: xiaogang.sxg <[email protected]> Authored: Fri Feb 17 11:19:18 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Feb 23 16:56:29 2017 +0100 ---------------------------------------------------------------------- docs/dev/stream/state.md | 8 +- .../streaming/state/AbstractRocksDBState.java | 49 +- .../state/RocksDBKeyedStateBackend.java | 10 + .../streaming/state/RocksDBMapState.java | 546 +++++++++++++++++++ .../api/common/functions/RuntimeContext.java | 42 ++ .../util/AbstractRuntimeUDFContext.java | 9 + .../flink/api/common/state/KeyedStateStore.java | 40 ++ .../apache/flink/api/common/state/MapState.java | 134 +++++ .../api/common/state/MapStateDescriptor.java | 147 +++++ .../flink/api/common/state/StateBinder.java | 9 + .../flink/api/common/state/StateDescriptor.java | 2 +- .../common/typeutils/base/MapSerializer.java | 193 +++++++ .../flink/api/java/typeutils/MapTypeInfo.java | 147 +++++ .../common/state/MapStateDescriptorTest.java | 115 ++++ .../typeutils/base/MapSerializerTest.java | 90 +++ .../flink/hdfstests/FileStateBackendTest.java | 4 + .../netty/message/KvStateRequestSerializer.java | 67 +++ .../state/AbstractKeyedStateBackend.java | 23 +- .../runtime/state/DefaultKeyedStateStore.java | 14 + .../flink/runtime/state/HashMapSerializer.java | 193 +++++++ .../flink/runtime/state/UserFacingMapState.java | 103 ++++ .../state/heap/HeapKeyedStateBackend.java | 18 +- .../flink/runtime/state/heap/HeapMapState.java | 311 +++++++++++ .../state/internal/InternalMapState.java | 32 ++ .../message/KvStateRequestSerializerTest.java | 131 +++++ .../runtime/state/FileStateBackendTest.java | 4 + .../runtime/state/MemoryStateBackendTest.java | 4 + .../runtime/state/SerializationProxiesTest.java | 3 +- .../runtime/state/StateBackendTestBase.java | 299 ++++++++++ .../api/functions/async/RichAsyncFunction.java | 7 + .../api/operators/StreamingRuntimeContext.java | 9 + .../functions/async/RichAsyncFunctionTest.java | 8 +- .../operators/StreamingRuntimeContextTest.java | 85 ++- .../KVStateRequestSerializerRocksDBTest.java | 40 ++ 34 files changed, 2887 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/docs/dev/stream/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md index e554e29..40522e1 100644 --- a/docs/dev/stream/state.md +++ b/docs/dev/stream/state.md @@ -118,6 +118,11 @@ added to the state. Contrary to `ReducingState`, the aggregate type may be diffe of elements that are added to the state. The interface is the same as for `ListState` but elements added using `add(T)` are folded into an aggregate using a specified `FoldFunction`. +* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and +retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or +`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable +views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively. + All types of state also have a method `clear()` that clears the state for the currently active key, i.e. the key of the input element. @@ -136,7 +141,7 @@ To get a state handle, you have to create a `StateDescriptor`. This holds the na that you can reference them), the type of the values that the state holds, and possibly a user-specified function, such as a `ReduceFunction`. Depending on what type of state you want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`, -a `ReducingStateDescriptor` or a `FoldingStateDescriptor`. +a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`. State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*. Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for @@ -147,6 +152,7 @@ is available in a `RichFunction` has these methods for accessing state: * `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)` * `ListState<T> getListState(ListStateDescriptor<T>)` * `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)` +* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)` This is an example `FlatMapFunction` that shows how all of the parts fit together: http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 89f41aa..569971a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -21,7 +21,10 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; @@ -50,7 +53,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta implements InternalKvState<N>, State { /** Serializer for the namespace */ - private final TypeSerializer<N> namespaceSerializer; + final TypeSerializer<N> namespaceSerializer; /** The current namespace, which the next value methods will refer to */ private N currentNamespace; @@ -215,4 +218,48 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta value >>>= 8; } while (value != 0); } + + protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { + int keyGroup = readKeyGroup(inputView); + K key = readKey(inputStream, inputView); + N namespace = readNamespace(inputStream, inputView); + + return new Tuple3<>(keyGroup, key, namespace); + } + + private int readKeyGroup(DataInputView inputView) throws IOException { + int keyGroup = 0; + for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) { + keyGroup <<= 8; + keyGroup |= (inputView.readByte() & 0xFF); + } + return keyGroup; + } + + private K readKey(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { + int beforeRead = inputStream.getPosition(); + K key = backend.getKeySerializer().deserialize(inputView); + if (ambiguousKeyPossible) { + int length = inputStream.getPosition() - beforeRead; + readVariableIntBytes(inputView, length); + } + return key; + } + + private N readNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { + int beforeRead = inputStream.getPosition(); + N namespace = namespaceSerializer.deserialize(inputView); + if (ambiguousKeyPossible) { + int length = inputStream.getPosition() - beforeRead; + readVariableIntBytes(inputView, length); + } + return namespace; + } + + private void readVariableIntBytes(DataInputView inputView, int value) throws IOException { + do { + inputView.readByte(); + value >>>= 8; + } while (value != 0); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index d8d77b6..a0efe78 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; @@ -53,6 +54,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.SerializableObject; @@ -882,6 +884,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); } + @Override + protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc) throws Exception { + ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + + return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this); + } + /** * Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator. * Used by #MergeIterator. http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java new file mode 100644 index 0000000..e9e9d9b --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link MapState} implementation that stores state in RocksDB. + * <p> + * <p>{@link RocksDBStateBackend} must ensure that we set the + * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since + * we use the {@code merge()} call. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <UK> The type of the keys in the map state. + * @param <UV> The type of the values in the map state. + */ +public class RocksDBMapState<K, N, UK, UV> + extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> + implements InternalMapState<N, UK, UV> { + + private static Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class); + + /** Serializer for the keys and values */ + private final TypeSerializer<UK> userKeySerializer; + private final TypeSerializer<UV> userValueSerializer; + + /** + * We disable writes to the write-ahead-log here. We can't have these in the base class + * because JNI segfaults for some reason if they are. + */ + private final WriteOptions writeOptions; + + /** + * Creates a new {@code RocksDBMapState}. + * + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. + */ + public RocksDBMapState(ColumnFamilyHandle columnFamily, + TypeSerializer<N> namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc, + RocksDBKeyedStateBackend<K> backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.userKeySerializer = stateDesc.getKeySerializer(); + this.userValueSerializer = stateDesc.getValueSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + // ------------------------------------------------------------------------ + // MapState Implementation + // ------------------------------------------------------------------------ + + @Override + public UV get(UK userKey) throws IOException, RocksDBException { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); + + return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes)); + } + + @Override + public void put(UK userKey, UV userValue) throws IOException, RocksDBException { + + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = serializeUserValue(userValue); + + backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); + } + + @Override + public void putAll(Map<UK, UV> map) throws IOException, RocksDBException { + if (map == null) { + return; + } + + for (Map.Entry<UK, UV> entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void remove(UK userKey) throws IOException, RocksDBException { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + + backend.db.remove(columnFamily, writeOptions, rawKeyBytes); + } + + @Override + public boolean contains(UK userKey) throws IOException, RocksDBException { + byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); + byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); + + return (rawValueBytes != null); + } + + @Override + public int size() throws IOException, RocksDBException { + Iterator<Map.Entry<UK, UV>> iterator = iterator(); + + int count = 0; + while (iterator.hasNext()) { + count++; + iterator.next(); + } + + return count; + } + + @Override + public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException { + final Iterator<Map.Entry<UK, UV>> iterator = iterator(); + + // Return null to make the behavior consistent with other states. + if (!iterator.hasNext()) { + return null; + } else { + return new Iterable<Map.Entry<UK, UV>>() { + @Override + public Iterator<Map.Entry<UK, UV>> iterator() { + return iterator; + } + }; + } + } + + @Override + public Iterable<UK> keys() throws IOException, RocksDBException { + final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); + + return new Iterable<UK>() { + @Override + public Iterator<UK> iterator() { + return new RocksDBMapIterator<UK>(backend.db, prefixBytes) { + @Override + public UK next() { + RocksDBMapEntry entry = nextEntry(); + return (entry == null ? null : entry.getKey()); + } + }; + } + }; + } + + @Override + public Iterable<UV> values() throws IOException, RocksDBException { + final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); + + return new Iterable<UV>() { + @Override + public Iterator<UV> iterator() { + return new RocksDBMapIterator<UV>(backend.db, prefixBytes) { + @Override + public UV next() { + RocksDBMapEntry entry = nextEntry(); + return (entry == null ? null : entry.getValue()); + } + }; + } + }; + } + + @Override + public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException { + final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); + + return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes) { + @Override + public Map.Entry<UK, UV> next() { + return nextEntry(); + } + }; + } + + @Override + public void clear() { + try { + Iterator<Map.Entry<UK, UV>> iterator = iterator(); + + while (iterator.hasNext()) { + iterator.next(); + iterator.remove(); + } + } catch (Exception e) { + LOG.warn("Error while cleaning the state.", e); + } + } + + @Override + @SuppressWarnings("unchecked") + public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { + Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); + + //TODO make KvStateRequestSerializer key-group aware to save this round trip and key-group computation + Tuple2<K, N> des = KvStateRequestSerializer.deserializeKeyAndNamespace( + serializedKeyAndNamespace, + backend.getKeySerializer(), + namespaceSerializer); + + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); + + ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128); + DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream); + writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView); + final byte[] keyPrefixBytes = outputStream.toByteArray(); + + final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) { + @Override + public Map.Entry<UK, UV> next() { + return nextEntry(); + } + }; + + // Return null to make the behavior consistent with other backends + if (!iterator.hasNext()) { + return null; + } + + return KvStateRequestSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() { + @Override + public Iterator<Map.Entry<UK, UV>> iterator() { + return iterator; + } + }, userKeySerializer, userValueSerializer); + } + + // ------------------------------------------------------------------------ + // Serialization Methods + // ------------------------------------------------------------------------ + + private byte[] serializeCurrentKeyAndNamespace() throws IOException { + writeCurrentKeyWithGroupAndNamespace(); + + return keySerializationStream.toByteArray(); + } + + private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException { + writeCurrentKeyWithGroupAndNamespace(); + userKeySerializer.serialize(userKey, keySerializationDataOutputView); + + return keySerializationStream.toByteArray(); + } + + private byte[] serializeUserValue(UV userValue) throws IOException { + keySerializationStream.reset(); + + if (userValue == null) { + keySerializationDataOutputView.writeBoolean(true); + } else { + keySerializationDataOutputView.writeBoolean(false); + userValueSerializer.serialize(userValue, keySerializationDataOutputView); + } + + + return keySerializationStream.toByteArray(); + } + + private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException { + ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + + readKeyWithGroupAndNamespace(bais, in); + + return userKeySerializer.deserialize(in); + } + + private UV deserializeUserValue(byte[] rawValueBytes) throws IOException { + ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes); + DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); + + boolean isNull = in.readBoolean(); + + return isNull ? null : userValueSerializer.deserialize(in); + } + + // ------------------------------------------------------------------------ + // Internal Classes + // ------------------------------------------------------------------------ + + /** A map entry in RocksDBMapState */ + private class RocksDBMapEntry implements Map.Entry<UK, UV> { + private final RocksDB db; + + /** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB + * with the format #KeyGroup#Key#Namespace#UserKey. */ + private final byte[] rawKeyBytes; + + /** The raw bytes of the value stored in RocksDB */ + private byte[] rawValueBytes; + + /** True if the entry has been deleted. */ + private boolean deleted; + + /** The user key and value. The deserialization is performed lazily, i.e. the key + * and the value is deserialized only when they are accessed. */ + private UK userKey = null; + private UV userValue = null; + + RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, final byte[] rawValueBytes) { + this.db = db; + + this.rawKeyBytes = rawKeyBytes; + this.rawValueBytes = rawValueBytes; + this.deleted = false; + } + + public void remove() { + deleted = true; + rawValueBytes = null; + + try { + db.remove(columnFamily, writeOptions, rawKeyBytes); + } catch (RocksDBException e) { + throw new RuntimeException("Error while removing data from RocksDB.", e); + } + } + + @Override + public UK getKey() { + if (userKey == null) { + try { + userKey = deserializeUserKey(rawKeyBytes); + } catch (IOException e) { + throw new RuntimeException("Error while deserializing the user key."); + } + } + + return userKey; + } + + @Override + public UV getValue() { + if (deleted) { + return null; + } else { + if (userValue == null) { + try { + userValue = deserializeUserValue(rawValueBytes); + } catch (IOException e) { + throw new RuntimeException("Error while deserializing the user value."); + } + } + + return userValue; + } + } + + @Override + public UV setValue(UV value) { + if (deleted) { + throw new IllegalStateException("The value has already been deleted."); + } + + UV oldValue = getValue(); + + try { + userValue = value; + rawValueBytes = serializeUserValue(value); + + db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while putting data into RocksDB.", e); + } + + return oldValue; + } + } + + /** An auxiliary utility to scan all entries under the given key. */ + private abstract class RocksDBMapIterator<T> implements Iterator<T> { + + final static int CACHE_SIZE_BASE = 1; + final static int CACHE_SIZE_LIMIT = 128; + + /** The db where data resides. */ + private final RocksDB db; + + /** + * The prefix bytes of the key being accessed. All entries under the same key + * has the same prefix, hence we can stop the iterating once coming across an + * entry with a different prefix. + */ + private final byte[] keyPrefixBytes; + + /** + * True if all entries have been accessed or the iterator has come across an + * entry with a different prefix. + */ + private boolean expired = false; + + /** A in-memory cache for the entries in the rocksdb. */ + private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>(); + private int cacheIndex = 0; + + + RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) { + this.db = db; + this.keyPrefixBytes = keyPrefixBytes; + } + + @Override + public boolean hasNext() { + loadCache(); + + return (cacheIndex < cacheEntries.size()); + } + + @Override + public void remove() { + if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) { + throw new IllegalStateException("The remove operation must be called after an valid next operation."); + } + + RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1); + lastEntry.remove(); + } + + final RocksDBMapEntry nextEntry() { + loadCache(); + + if (cacheIndex == cacheEntries.size()) { + if (!expired) { + throw new IllegalStateException(); + } + + return null; + } + + RocksDBMapEntry entry = cacheEntries.get(cacheIndex); + cacheIndex++; + + return entry; + } + + private void loadCache() { + if (cacheIndex > cacheEntries.size()) { + throw new IllegalStateException(); + } + + // Load cache entries only when the cache is empty and there still exist unread entries + if (cacheIndex < cacheEntries.size() || expired) { + return; + } + + RocksIterator iterator = db.newIterator(columnFamily); + + /* + * The iteration starts from the prefix bytes at the first loading. The cache then is + * reloaded when the next entry to return is the last one in the cache. At that time, + * we will start the iterating from the last returned entry. + */ + RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); + byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); + int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT)); + + cacheEntries.clear(); + cacheIndex = 0; + + iterator.seek(startBytes); + + /* + * If the last returned entry is not deleted, it will be the first entry in the + * iterating. Skip it to avoid redundant access in such cases. + */ + if (lastEntry != null && !lastEntry.deleted) { + iterator.next(); + } + + while (true) { + if (!iterator.isValid() || !underSameKey(iterator.key())) { + expired = true; + break; + } + + if (cacheEntries.size() >= numEntries) { + break; + } + + RocksDBMapEntry entry = new RocksDBMapEntry(db, iterator.key(), iterator.value()); + cacheEntries.add(entry); + + iterator.next(); + } + + iterator.close(); + } + + private boolean underSameKey(byte[] rawKeyBytes) { + if (rawKeyBytes.length < keyPrefixBytes.length) { + return false; + } + + for (int i = 0; i < keyPrefixBytes.length; ++i) { + if (rawKeyBytes[i] != keyPrefixBytes[i]) { + return false; + } + } + + return true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 405e390..98ad018 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -31,6 +31,8 @@ import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -387,4 +389,44 @@ public interface RuntimeContext { */ @PublicEvolving <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties); + + /** + * Gets a handle to the system's key/value map state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * is composed of user-defined key-value pairs + * + * <p>This state is only accessible if the function is executed on a KeyedStream. + * + * <pre>{@code + * DataStream<MyType> stream = ...; + * KeyedStream<MyType> keyedStream = stream.keyBy("id"); + * + * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { + * + * private MapState<MyType, Long> state; + * + * public void open(Configuration cfg) { + * state = getRuntimeContext().getMapState( + * new MapStateDescriptor<>("sum", MyType.class, Long.class)); + * } + * + * public Tuple2<MyType, Long> map(MyType value) { + * return new Tuple2<>(value, state.get(value)); + * } + * }); + * + * }</pre> + * + * @param stateProperties The descriptor defining the properties of the stats. + * + * @param <UK> The type of the user keys stored in the state. + * @param <UV> The type of the user values stored in the state. + * + * @return The partitioned state object. + * + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @PublicEvolving + <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); } http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 0eafeaa..2538799 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -33,6 +33,8 @@ import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -214,4 +216,11 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); } + + @Override + @PublicEvolving + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + throw new UnsupportedOperationException( + "This state is only accessible by functions executed on a KeyedStream"); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java index bbb4c67..2187f6c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java @@ -196,4 +196,44 @@ public interface KeyedStateStore { */ @PublicEvolving <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties); + + /** + * Gets a handle to the system's key/value map state. This state is similar to the state + * accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that + * is composed of user-defined key-value pairs + * + * <p>This state is only accessible if the function is executed on a KeyedStream. + * + * <pre>{@code + * DataStream<MyType> stream = ...; + * KeyedStream<MyType> keyedStream = stream.keyBy("id"); + * + * keyedStream.map(new RichMapFunction<MyType, List<MyType>>() { + * + * private MapState<MyType, Long> state; + * + * public void open(Configuration cfg) { + * state = getRuntimeContext().getMapState( + * new MapStateDescriptor<>("sum", MyType.class, Long.class)); + * } + * + * public Tuple2<MyType, Long> map(MyType value) { + * return new Tuple2<>(value, state.get(value)); + * } + * }); + * + * }</pre> + * + * @param stateProperties The descriptor defining the properties of the stats. + * + * @param <UK> The type of the user keys stored in the state. + * @param <UV> The type of the user values stored in the state. + * + * @return The partitioned state object. + * + * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the + * function (function is not part of a KeyedStream). + */ + @PublicEvolving + <UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java new file mode 100644 index 0000000..fa657ef --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Iterator; +import java.util.Map; + +/** + * {@link State} interface for partitioned key-value state. The key-value pair can be + * added, updated and retrieved. + * + * <p>The state is accessed and modified by user functions, and checkpointed consistently + * by the system as part of the distributed snapshots. + * + * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is + * automatically supplied by the system, so the function always sees the value mapped to the + * key of the current element. That way, the system can handle stream and state partitioning + * consistently together. + * + * @param <UK> Type of the keys in the state. + * @param <UV> Type of the values in the state. + */ +@PublicEvolving +public interface MapState<UK, UV> extends State { + + /** + * Returns the current value associated with the given key. + * + * @param key The key of the mapping + * @return The value of the mapping with the given key + * + * @throws Exception Thrown if the system cannot access the state. + */ + UV get(UK key) throws Exception; + + /** + * Associates a new value with the given key. + * + * @param key The key of the mapping + * @param value The new value of the mapping + * + * @throws Exception Thrown if the system cannot access the state. + */ + void put(UK key, UV value) throws Exception; + + /** + * Copies all of the mappings from the given map into the state. + * + * @param map The mappings to be stored in this state + * + * @throws Exception Thrown if the system cannot access the state. + */ + void putAll(Map<UK, UV> map) throws Exception; + + /** + * Deletes the mapping of the given key. + * + * @param key The key of the mapping + * + * @throws Exception Thrown if the system cannot access the state. + */ + void remove(UK key) throws Exception; + + /** + * Returns whether there exists the given mapping. + * + * @param key The key of the mapping + * @return True if there exists a mapping whose key equals to the given key + * + * @throws Exception Thrown if the system cannot access the state. + */ + boolean contains(UK key) throws Exception; + + /** + * @return The number of mappings in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + int size() throws Exception; + + /** + * Returns all the mappings in the state + * + * @return An iterable view of all the key-value pairs in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + Iterable<Map.Entry<UK, UV>> entries() throws Exception; + + /** + * Returns all the keys in the state + * + * @return An iterable view of all the keys in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + Iterable<UK> keys() throws Exception; + + /** + * Returns all the values in the state. + * + * @return An iterable view of all the values in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + Iterable<UV> values() throws Exception; + + /** + * Iterates over all the mappings in the state. + * + * @return An iterator over all the mappings in the state + * + * @throws Exception Thrown if the system cannot access the state. + */ + Iterator<Map.Entry<UK, UV>> iterator() throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java new file mode 100644 index 0000000..d4a49f8 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.java.typeutils.MapTypeInfo; + +import java.util.Map; + +/** + * A {@link StateDescriptor} for {@link MapState}. This can be used to create state where the type + * is a map that can be updated and iterated over. + * + * <p>Using {@code MapState} is typically more efficient than manually maintaining a map in a + * {@link ValueState}, because the backing implementation can support efficient updates, rather then + * replacing the full map on write. + * + * <p>To create keyed map state (on a KeyedStream), use + * {@link org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}. + * + * @param <UK> The type of the keys that can be added to the map state. + */ +@PublicEvolving +public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> { + + /** + * Create a new {@code MapStateDescriptor} with the given name and the given type serializers. + * + * @param name The name of the {@code MapStateDescriptor}. + * @param keySerializer The type serializer for the keys in the state. + * @param valueSerializer The type serializer for the values in the state. + */ + public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) { + super(name, new MapSerializer<>(keySerializer, valueSerializer), null); + } + + /** + * Create a new {@code MapStateDescriptor} with the given name and the given type informations. + * + * @param name The name of the {@code MapStateDescriptor}. + * @param keyTypeInfo The type information for the keys in the state. + * @param valueTypeInfo The type information for the values in the state. + */ + public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) { + super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null); + } + + /** + * Create a new {@code MapStateDescriptor} with the given name and the given type information. + * + * <p>If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor. + * + * @param name The name of the {@code MapStateDescriptor}. + * @param keyClass The class of the type of keys in the state. + * @param valueClass The class of the type of values in the state. + */ + public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) { + super(name, new MapTypeInfo<>(keyClass, valueClass), null); + } + + @Override + public MapState<UK, UV> bind(StateBinder stateBinder) throws Exception { + return stateBinder.createMapState(this); + } + + @Override + public Type getType() { + return Type.MAP; + } + + /** + * Gets the serializer for the keys in the state. + * + * @return The serializer for the keys in the state. + */ + public TypeSerializer<UK> getKeySerializer() { + final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer(); + if (!(rawSerializer instanceof MapSerializer)) { + throw new IllegalStateException("Unexpected serializer type."); + } + + return ((MapSerializer<UK, UV>) rawSerializer).getKeySerializer(); + } + + /** + * Gets the serializer for the values in the state. + * + * @return The serializer for the values in the state. + */ + public TypeSerializer<UV> getValueSerializer() { + final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer(); + if (!(rawSerializer instanceof MapSerializer)) { + throw new IllegalStateException("Unexpected serializer type."); + } + + return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer(); + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o; + return serializer.equals(that.serializer) && name.equals(that.name); + } + + @Override + public String toString() { + return "MapStateDescriptor{" + + "name=" + name + + ", serializer=" + serializer + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java index 08dfc90..9df7a47 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java @@ -70,4 +70,13 @@ public interface StateBinder { * @param <ACC> Type of the value in the state */ <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; + + /** + * Creates and returns a new {@link MapState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <MK> Type of the keys in the state + * @param <MV> Type of the values in the state + */ + <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 332e649..a52ea32 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -55,7 +55,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl */ // IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization public enum Type { - @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING + @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING, MAP } private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java new file mode 100644 index 0000000..5e1a3bf --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; + +/** + * A serializer for {@link Map}. The serializer relies on a key serializer and a value serializer + * for the serialization of the map's key-value pairs. + * + * <p>The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, each value + * is prefixed by a null marker. + * + * @param <K> The type of the keys in the map. + * @param <V> The type of the values in the map. + */ +public class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> { + + private static final long serialVersionUID = -6885593032367050078L; + + /** The serializer for the keys in the map */ + private final TypeSerializer<K> keySerializer; + + /** The serializer for the values in the map */ + private final TypeSerializer<V> valueSerializer; + + /** + * Creates a map serializer that uses the given serializers to serialize the key-value pairs in the map. + * + * @param keySerializer The serializer for the keys in the map + * @param valueSerializer The serializer for the values in the map + */ + public MapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) { + this.keySerializer = Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null"); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null."); + } + + // ------------------------------------------------------------------------ + // MapSerializer specific properties + // ------------------------------------------------------------------------ + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + // ------------------------------------------------------------------------ + // Type Serializer implementation + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<Map<K, V>> duplicate() { + TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate(); + TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate(); + + return new MapSerializer<>(duplicateKeySerializer, duplicateValueSerializer); + } + + @Override + public Map<K, V> createInstance() { + return new HashMap<>(); + } + + @Override + public Map<K, V> copy(Map<K, V> from) { + Map<K, V> newMap = new HashMap<>(from.size()); + + for (Map.Entry<K, V> entry : from.entrySet()) { + K newKey = keySerializer.copy(entry.getKey()); + V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue()); + + newMap.put(newKey, newValue); + } + + return newMap; + } + + @Override + public Map<K, V> copy(Map<K, V> from, Map<K, V> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; // var length + } + + @Override + public void serialize(Map<K, V> map, DataOutputView target) throws IOException { + final int size = map.size(); + target.writeInt(size); + + for (Map.Entry<K, V> entry : map.entrySet()) { + keySerializer.serialize(entry.getKey(), target); + + if (entry.getValue() == null) { + target.writeBoolean(true); + } else { + target.writeBoolean(false); + valueSerializer.serialize(entry.getValue(), target); + } + } + } + + @Override + public Map<K, V> deserialize(DataInputView source) throws IOException { + final int size = source.readInt(); + + final Map<K, V> map = new HashMap<>(size); + for (int i = 0; i < size; ++i) { + K key = keySerializer.deserialize(source); + + boolean isNull = source.readBoolean(); + V value = isNull ? null : valueSerializer.deserialize(source); + + map.put(key, value); + } + + return map; + } + + @Override + public Map<K, V> deserialize(Map<K, V> reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final int size = source.readInt(); + target.writeInt(size); + + for (int i = 0; i < size; ++i) { + keySerializer.copy(source, target); + + boolean isNull = source.readBoolean(); + target.writeBoolean(isNull); + + if (!isNull) { + valueSerializer.copy(source, target); + } + } + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + keySerializer.equals(((MapSerializer<?, ?>) obj).getKeySerializer()) && + valueSerializer.equals(((MapSerializer<?, ?>) obj).getValueSerializer())); + } + + @Override + public boolean canEqual(Object obj) { + return (obj != null && obj.getClass() == getClass()); + } + + @Override + public int hashCode() { + return keySerializer.hashCode() * 31 + valueSerializer.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java new file mode 100644 index 0000000..ca04e0c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/MapTypeInfo.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.util.Preconditions; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Special {@code TypeInformation} used by {@link org.apache.flink.api.common.state.MapStateDescriptor}. + * + * @param <K> The type of the keys in the map. + * @param <V> The type of the values in the map. + */ +@PublicEvolving +public class MapTypeInfo<K, V> extends TypeInformation<Map<K, V>> { + + /* The type information for the keys in the map*/ + private final TypeInformation<K> keyTypeInfo; + + /* The type information for the values in the map */ + private final TypeInformation<V> valueTypeInfo; + + public MapTypeInfo(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo) { + this.keyTypeInfo = Preconditions.checkNotNull(keyTypeInfo, "The key type information cannot be null."); + this.valueTypeInfo = Preconditions.checkNotNull(valueTypeInfo, "The value type information cannot be null."); + } + + public MapTypeInfo(Class<K> keyClass, Class<V> valueClass) { + this.keyTypeInfo = of(checkNotNull(keyClass, "The key class cannot be null.")); + this.valueTypeInfo = of(checkNotNull(valueClass, "The value class cannot be null.")); + } + + // ------------------------------------------------------------------------ + // MapTypeInfo specific properties + // ------------------------------------------------------------------------ + + /** + * Gets the type information for the keys in the map + */ + public TypeInformation<K> getKeyTypeInfo() { + return keyTypeInfo; + } + + /** + * Gets the type information for the values in the map + */ + public TypeInformation<V> getValueTypeInfo() { + return valueTypeInfo; + } + + // ------------------------------------------------------------------------ + // TypeInformation implementation + // ------------------------------------------------------------------------ + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 2; + } + + @SuppressWarnings("unchecked") + @Override + public Class<Map<K, V>> getTypeClass() { + return (Class<Map<K, V>>)(Class<?>)Map.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer<Map<K, V>> createSerializer(ExecutionConfig config) { + TypeSerializer<K> keyTypeSerializer = keyTypeInfo.createSerializer(config); + TypeSerializer<V> valueTypeSerializer = valueTypeInfo.createSerializer(config); + + return new MapSerializer<>(keyTypeSerializer, valueTypeSerializer); + } + + @Override + public String toString() { + return "Map<" + keyTypeInfo + ", " + valueTypeInfo + ">"; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj instanceof MapTypeInfo) { + @SuppressWarnings("unchecked") + MapTypeInfo<K, V> other = (MapTypeInfo<K, V>) obj; + + return (other.canEqual(this) && + keyTypeInfo.equals(other.keyTypeInfo) && valueTypeInfo.equals(other.valueTypeInfo)); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * keyTypeInfo.hashCode() + valueTypeInfo.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return (obj != null && obj.getClass() == getClass()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java new file mode 100644 index 0000000..9d1b105 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class MapStateDescriptorTest { + + @Test + public void testMapStateDescriptorEagerSerializer() throws Exception { + + TypeSerializer<Integer> keySerializer = new KryoSerializer<>(Integer.class, new ExecutionConfig()); + TypeSerializer<String> valueSerializer = new KryoSerializer<>(String.class, new ExecutionConfig()); + + MapStateDescriptor<Integer, String> descr = + new MapStateDescriptor<>("testName", keySerializer, valueSerializer); + + assertEquals("testName", descr.getName()); + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof MapSerializer); + assertNotNull(descr.getKeySerializer()); + assertEquals(keySerializer, descr.getKeySerializer()); + assertNotNull(descr.getValueSerializer()); + assertEquals(valueSerializer, descr.getValueSerializer()); + + MapStateDescriptor<Integer, String> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + assertNotNull(copy.getSerializer()); + assertTrue(copy.getSerializer() instanceof MapSerializer); + + assertNotNull(copy.getKeySerializer()); + assertEquals(keySerializer, copy.getKeySerializer()); + assertNotNull(copy.getValueSerializer()); + assertEquals(valueSerializer, copy.getValueSerializer()); + } + + @Test + public void testMapStateDescriptorLazySerializer() throws Exception { + // some different registered value + ExecutionConfig cfg = new ExecutionConfig(); + cfg.registerKryoType(TaskInfo.class); + + MapStateDescriptor<Path, String> descr = + new MapStateDescriptor<>("testName", Path.class, String.class); + + try { + descr.getSerializer(); + fail("should cause an exception"); + } catch (IllegalStateException ignored) {} + + descr.initializeSerializerUnlessSet(cfg); + + assertNotNull(descr.getSerializer()); + assertTrue(descr.getSerializer() instanceof MapSerializer); + + assertNotNull(descr.getKeySerializer()); + assertTrue(descr.getKeySerializer() instanceof KryoSerializer); + + assertTrue(((KryoSerializer<?>) descr.getKeySerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0); + + assertNotNull(descr.getValueSerializer()); + assertTrue(descr.getValueSerializer() instanceof StringSerializer); + } + + @Test + public void testMapStateDescriptorAutoSerializer() throws Exception { + + MapStateDescriptor<String, Long> descr = + new MapStateDescriptor<>("testName", String.class, Long.class); + + MapStateDescriptor<String, Long> copy = CommonTestUtils.createCopySerializable(descr); + + assertEquals("testName", copy.getName()); + + assertNotNull(copy.getSerializer()); + assertTrue(copy.getSerializer() instanceof MapSerializer); + + assertNotNull(copy.getKeySerializer()); + assertEquals(StringSerializer.INSTANCE, copy.getKeySerializer()); + assertNotNull(copy.getValueSerializer()); + assertEquals(LongSerializer.INSTANCE, copy.getValueSerializer()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java new file mode 100644 index 0000000..9ce7de1 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/MapSerializerTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +/** + * A test for the {@link MapSerializer}. + */ +public class MapSerializerTest extends SerializerTestBase<Map<Long, String>> { + + @Override + protected TypeSerializer<Map<Long, String>> createSerializer() { + return new MapSerializer<>(LongSerializer.INSTANCE, StringSerializer.INSTANCE); + } + + @Override + protected int getLength() { + return -1; + } + + @SuppressWarnings("unchecked") + @Override + protected Class<Map<Long, String>> getTypeClass() { + return (Class<Map<Long, String>>) (Class<?>) Map.class; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + protected Map<Long, String>[] getTestData() { + final Random rnd = new Random(123654789); + + // empty maps + final Map<Long, String> map1 = Collections.emptyMap(); + final Map<Long, String> map2 = new HashMap<>(); + final Map<Long, String> map3 = new TreeMap<>(); + + // single element maps + final Map<Long, String> map4 = Collections.singletonMap(0L, "hello"); + final Map<Long, String> map5 = new HashMap<>(); + map5.put(12345L, "12345L"); + final Map<Long, String> map6 = new TreeMap<>(); + map6.put(777888L, "777888L"); + + // longer maps + final Map<Long, String> map7 = new HashMap<>(); + for (int i = 0; i < rnd.nextInt(200); i++) { + map7.put(rnd.nextLong(), Long.toString(rnd.nextLong())); + } + + final Map<Long, String> map8 = new TreeMap<>(); + for (int i = 0; i < rnd.nextInt(200); i++) { + map8.put(rnd.nextLong(), Long.toString(rnd.nextLong())); + } + + // null-value maps + final Map<Long, String> map9 = Collections.singletonMap(0L, null); + final Map<Long, String> map10 = new HashMap<>(); + map10.put(999L, null); + final Map<Long, String> map11 = new TreeMap<>(); + map11.put(666L, null); + + return (Map<Long, String>[]) new Map[] { + map1, map2, map3, map4, map5, map6, map7, map8, map9, map10, map11 + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index 109d152..7f8eea8 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -118,6 +118,10 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> { @Override @Test public void testReducingStateRestoreWithWrongSerializers() {} + + @Override + @Test + public void testMapStateRestoreWithWrongSerializers() {} @Test public void testStateOutputStream() { http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java index 2f32861..bc830e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java @@ -36,7 +36,9 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Serialization and deserialization of messages exchanged between @@ -484,6 +486,71 @@ public final class KvStateRequestSerializer { return null; } } + + /** + * Serializes all values of the Iterable with the given serializer. + * + * @param entries Key-value pairs to serialize + * @param keySerializer Serializer for UK + * @param valueSerializer Serializer for UV + * @param <UK> Type of the keys + * @param <UV> Type of the values + * @return Serialized values or <code>null</code> if values <code>null</code> or empty + * @throws IOException On failure during serialization + */ + public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { + if (entries != null) { + // Serialize + DataOutputSerializer dos = new DataOutputSerializer(32); + + for (Map.Entry<UK, UV> entry : entries) { + keySerializer.serialize(entry.getKey(), dos); + + if (entry.getValue() == null) { + dos.writeBoolean(true); + } else { + dos.writeBoolean(false); + valueSerializer.serialize(entry.getValue(), dos); + } + } + + return dos.getCopyOfBuffer(); + } else { + return null; + } + } + + /** + * Deserializes all kv pairs with the given serializer. + * + * @param serializedValue Serialized value of type Map<UK, UV> + * @param keySerializer Serializer for UK + * @param valueSerializer Serializer for UV + * @param <UK> Type of the key + * @param <UV> Type of the value. + * @return Deserialized map or <code>null</code> if the serialized value + * is <code>null</code> + * @throws IOException On failure during deserialization + */ + public static <UK, UV> Map<UK, UV> deserializeMap(byte[] serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { + if (serializedValue != null) { + DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length); + + Map<UK, UV> result = new HashMap<>(); + while (in.available() > 0) { + UK key = keySerializer.deserialize(in); + + boolean isNull = in.readBoolean(); + UV value = isNull ? null : valueSerializer.deserialize(in); + + result.put(key, value); + } + + return result; + } else { + return null; + } + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index fe5d1cc..3ed49f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -25,6 +25,8 @@ import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.State; @@ -39,6 +41,7 @@ import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.util.Preconditions; @@ -189,6 +192,20 @@ public abstract class AbstractKeyedStateBackend<K> FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; /** + * Creates and returns a new {@link MapState}. + * + * @param namespaceSerializer TypeSerializer for the state namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param <N> The type of the namespace. + * @param <UK> Type of the keys in the state + * @param <UV> Type of the values in the state * + */ + protected abstract <N, UK, UV> InternalMapState<N, UK, UV> createMapState( + TypeSerializer<N> namespaceSerializer, + MapStateDescriptor<UK, UV> stateDesc) throws Exception; + + /** * @see KeyedStateBackend */ @Override @@ -285,12 +302,16 @@ public abstract class AbstractKeyedStateBackend<K> AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception { return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc); } - @Override public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc); } + + @Override + public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception { + return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc); + } }); http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java index d8b8aa8..a32cebd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java @@ -25,6 +25,8 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.State; @@ -93,6 +95,18 @@ public class DefaultKeyedStateStore implements KeyedStateStore { } } + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + requireNonNull(stateProperties, "The state properties must not be null"); + try { + stateProperties.initializeSerializerUnlessSet(executionConfig); + MapState<UK, UV> originalState = getPartitionedState(stateProperties); + return new UserFacingMapState<>(originalState); + } catch (Exception e) { + throw new RuntimeException("Error while getting state", e); + } + } + private <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception { return keyedStateBackend.getPartitionedState( VoidNamespace.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java new file mode 100644 index 0000000..61cc58c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * A serializer for {@link HashMap}. The serializer relies on a key serializer and a value serializer + * for the serialization of the map's key-value pairs. + * + * <p>The serialization format for the map is as follows: four bytes for the length of the map, + * followed by the serialized representation of each key-value pair. To allow null values, each value + * is prefixed by a null marker. + * + * @param <K> The type of the keys in the map. + * @param <V> The type of the values in the map. + */ +public class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> { + + private static final long serialVersionUID = -6885593032367050078L; + + /** The serializer for the keys in the map */ + private final TypeSerializer<K> keySerializer; + + /** The serializer for the values in the map */ + private final TypeSerializer<V> valueSerializer; + + /** + * Creates a map serializer that uses the given serializers to serialize the key-value pairs in the map. + * + * @param keySerializer The serializer for the keys in the map + * @param valueSerializer The serializer for the values in the map + */ + public HashMapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) { + this.keySerializer = Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null"); + this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null."); + } + + // ------------------------------------------------------------------------ + // HashMapSerializer specific properties + // ------------------------------------------------------------------------ + + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + // ------------------------------------------------------------------------ + // Type Serializer implementation + // ------------------------------------------------------------------------ + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<HashMap<K, V>> duplicate() { + TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate(); + TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate(); + + return new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer); + } + + @Override + public HashMap<K, V> createInstance() { + return new HashMap<>(); + } + + @Override + public HashMap<K, V> copy(HashMap<K, V> from) { + HashMap<K, V> newHashMap = new HashMap<>(from.size()); + + for (Map.Entry<K, V> entry : from.entrySet()) { + K newKey = keySerializer.copy(entry.getKey()); + V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue()); + + newHashMap.put(newKey, newValue); + } + + return newHashMap; + } + + @Override + public HashMap<K, V> copy(HashMap<K, V> from, HashMap<K, V> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; // var length + } + + @Override + public void serialize(HashMap<K, V> map, DataOutputView target) throws IOException { + final int size = map.size(); + target.writeInt(size); + + for (Map.Entry<K, V> entry : map.entrySet()) { + keySerializer.serialize(entry.getKey(), target); + + if (entry.getValue() == null) { + target.writeBoolean(true); + } else { + target.writeBoolean(false); + valueSerializer.serialize(entry.getValue(), target); + } + } + } + + @Override + public HashMap<K, V> deserialize(DataInputView source) throws IOException { + final int size = source.readInt(); + + final HashMap<K, V> map = new HashMap<>(size); + for (int i = 0; i < size; ++i) { + K key = keySerializer.deserialize(source); + + boolean isNull = source.readBoolean(); + V value = isNull ? null : valueSerializer.deserialize(source); + + map.put(key, value); + } + + return map; + } + + @Override + public HashMap<K, V> deserialize(HashMap<K, V> reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final int size = source.readInt(); + target.writeInt(size); + + for (int i = 0; i < size; ++i) { + keySerializer.copy(source, target); + + boolean isNull = source.readBoolean(); + target.writeBoolean(isNull); + + if (!isNull) { + valueSerializer.copy(source, target); + } + } + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + keySerializer.equals(((HashMapSerializer<?, ?>) obj).getKeySerializer()) && + valueSerializer.equals(((HashMapSerializer<?, ?>) obj).getValueSerializer())); + } + + @Override + public boolean canEqual(Object obj) { + return (obj != null && obj.getClass() == getClass()); + } + + @Override + public int hashCode() { + return keySerializer.hashCode() * 31 + valueSerializer.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/30c9e2b6/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java new file mode 100644 index 0000000..6cddf6d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.MapState; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; + +/** + * Simple wrapper map state that exposes empty state properly as an empty map. + * + * @param <K> The type of keys in the map state. + * @param <V> The type of values in the map state. + */ +class UserFacingMapState<K, V> implements MapState<K, V> { + + private final MapState<K, V> originalState; + + private final Map<K, V> emptyState = Collections.<K, V>emptyMap(); + + UserFacingMapState(MapState<K, V> originalState) { + this.originalState = originalState; + } + + // ------------------------------------------------------------------------ + + @Override + public V get(K key) throws Exception { + return originalState.get(key); + } + + @Override + public void put(K key, V value) throws Exception { + originalState.put(key, value); + } + + @Override + public void putAll(Map<K, V> value) throws Exception { + originalState.putAll(value); + } + + @Override + public void clear() { + originalState.clear(); + } + + @Override + public void remove(K key) throws Exception { + originalState.remove(key); + } + + @Override + public boolean contains(K key) throws Exception { + return originalState.contains(key); + } + + @Override + public int size() throws Exception { + return originalState.size(); + } + + @Override + public Iterable<Map.Entry<K, V>> entries() throws Exception { + Iterable<Map.Entry<K, V>> original = originalState.entries(); + return original != null ? original : emptyState.entrySet(); + } + + @Override + public Iterable<K> keys() throws Exception { + Iterable<K> original = originalState.keys(); + return original != null ? original : emptyState.keySet(); + } + + @Override + public Iterable<V> values() throws Exception { + Iterable<V> original = originalState.values(); + return original != null ? original : emptyState.values(); + } + + @Override + public Iterator<Map.Entry<K, V>> iterator() throws Exception { + Iterator<Map.Entry<K, V>> original = originalState.iterator(); + return original != null ? original : emptyState.entrySet().iterator(); + } +}
