Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r174454169
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
---
@@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
}
@Override
- public byte[] getSerializedValue(K key, N namespace) throws IOException
{
- Preconditions.checkState(namespace != null, "No namespace
given.");
- Preconditions.checkState(key != null, "No key given.");
+ public byte[] getSerializedValue(
+ byte[] serializedKeyAndNamespace,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<HashMap<UK, UV>> valueSerializer) throws
Exception {
- HashMap<UK, UV> result = stateTable.get(key, namespace);
+ Preconditions.checkNotNull(serializedKeyAndNamespace,
"Serialized key and namespace");
- if (null == result) {
+ Tuple2<K, N> keyAndNamespace =
KvStateSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, keySerializer,
namespaceSerializer);
+
+ Map<UK, UV> result = stateTable.get(keyAndNamespace.f0,
keyAndNamespace.f1);
+
+ if (result == null) {
return null;
}
- TypeSerializer<UK> userKeySerializer =
stateDesc.getKeySerializer();
- TypeSerializer<UV> userValueSerializer =
stateDesc.getValueSerializer();
+ final HashMapSerializer<UK, UV> serializer =
(HashMapSerializer<UK, UV>) valueSerializer;
--- End diff --
The reason is that RocksDB returns an iterator that gets lazily populated
as you call next() while the serialize() of the MapSerializer expects a Map. If
it were to go with your option, we would have to iterate over the map twice,
once to create the map, and then to serialize it.
---