[
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398259#comment-16398259
]
ASF GitHub Bot commented on FLINK-8802:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r174382685
--- Diff:
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture<KvStateResponse> responseFuture = new
CompletableFuture<>();
try {
- final InternalKvState<?> kvState =
registry.getKvState(request.getKvStateId());
+ final KvStateEntry<?, ?, ?> kvState =
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace =
request.getSerializedKeyAndNamespace();
- byte[] serializedResult =
kvState.getSerializedValue(serializedKeyAndNamespace);
+ // here we remove any type check...
+ // Ideally we want to keep that the info match
the state.
--- End diff --
you can retain type safety:
Call from the handler:
```
byte[] serializedResult = getSerializedValue(kvState,
serializedKeyAndNamespace);
```
Added method:
```
private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V>
entry, byte[] serializedKeyAndNamespace) throws Exception {
InternalKvState<K, N, V> state = entry.getState();
KvStateInfo<K, N, V> infoForCurrentThread =
entry.getInfoForCurrentThread();
return state.getSerializedValue(
serializedKeyAndNamespace,
infoForCurrentThread.getKeySerializer(),
infoForCurrentThread.getNamespaceSerializer(),
infoForCurrentThread.getStateValueSerializer()
);
}
```
> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers
> are not duplicated, which may lead to exceptions thrown when a serializer is
> stateful.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)