[
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398545#comment-16398545
]
ASF GitHub Bot commented on FLINK-8802:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r174450032
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
---
@@ -140,19 +159,28 @@ public boolean contains(UK userKey) {
}
@Override
- public byte[] getSerializedValue(K key, N namespace) throws IOException
{
- Preconditions.checkState(namespace != null, "No namespace
given.");
- Preconditions.checkState(key != null, "No key given.");
+ public byte[] getSerializedValue(
+ byte[] serializedKeyAndNamespace,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<HashMap<UK, UV>> valueSerializer) throws
Exception {
- HashMap<UK, UV> result = stateTable.get(key, namespace);
+ Preconditions.checkNotNull(serializedKeyAndNamespace,
"Serialized key and namespace");
- if (null == result) {
+ Tuple2<K, N> keyAndNamespace =
KvStateSerializer.deserializeKeyAndNamespace(
+ serializedKeyAndNamespace, keySerializer,
namespaceSerializer);
+
+ Map<UK, UV> result = stateTable.get(keyAndNamespace.f0,
keyAndNamespace.f1);
+
+ if (result == null) {
return null;
}
- TypeSerializer<UK> userKeySerializer =
stateDesc.getKeySerializer();
- TypeSerializer<UV> userValueSerializer =
stateDesc.getValueSerializer();
+ final HashMapSerializer<UK, UV> serializer =
(HashMapSerializer<UK, UV>) valueSerializer;
--- End diff --
i mean why isn't the signature of `KvStateSerializer.serializeMap`:
```
KvStateSerializer.serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>>
serializer);
```
with the following implementation:
```
...
serializeMap(Map<UK, UV> map, TypeSerializer<Map<UK, UV>> serializer) {
if (map != null) {
DataOutputSerializer dos = new DataOutputSerializer(32);
serializer.serialize(map, dos);
return dos.getCopyOfBuffer();
} else {
return null;
}
```
Why deal with the map key/value entries at all outside the serializer?
> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers
> are not duplicated, which may lead to exceptions thrown when a serializer is
> stateful.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)