Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r174128377
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
    @@ -78,13 +75,22 @@ public KvStateServerHandler(
                final CompletableFuture<KvStateResponse> responseFuture = new 
CompletableFuture<>();
     
                try {
    -                   final InternalKvState<?> kvState = 
registry.getKvState(request.getKvStateId());
    +                   final KvStateEntry<?, ?, ?> kvState = 
registry.getKvState(request.getKvStateId());
                        if (kvState == null) {
                                responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
                        } else {
                                byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
     
    -                           byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
    +                           // here we remove any type check...
    +                           // Ideally we want to keep that the info match 
the state.
    +                           final InternalKvState state = 
kvState.getState();
    +                           final KvStateInfo info = 
kvState.getInfoForCurrentThread();
    +
    +                           byte[] serializedResult = 
state.getSerializedValue(
    --- End diff --
    
    Couldn't we synchronize on `kvState` instead of modifying all 
`InternalKvState` implementations?
    
    This seems like a much safer alternative than baking in the assumption that 
`getSerializedValue()` can be called concurrently.



---

Reply via email to