[ 
https://issues.apache.org/jira/browse/FLINK-8058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646244#comment-16646244
 ] 

ASF GitHub Bot commented on FLINK-8058:
---------------------------------------

klion26 closed pull request #6430: [FLINK-8058][Queryable State]Queryable state 
should check types
URL: https://github.com/apache/flink/pull/6430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 470c7acf9d7..70f2c31642c 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -263,14 +263,16 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig 
config) {
                stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 
                final byte[] serializedKeyAndNamespace;
+               final byte[] serializedStateDescriptor;
                try {
                        serializedKeyAndNamespace = KvStateSerializer
                                        .serializeKeyAndNamespace(key, 
keySerializer, namespace, namespaceSerializer);
+                       serializedStateDescriptor = 
KvStateSerializer.serializedStateDescriptor(stateDescriptor);
                } catch (IOException e) {
                        return FutureUtils.getFailedFuture(e);
                }
 
-               return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace)
+               return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace, serializedStateDescriptor)
                        .thenApply(stateResponse -> createState(stateResponse, 
stateDescriptor));
        }
 
@@ -306,10 +308,12 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig 
config) {
                        final JobID jobId,
                        final String queryableStateName,
                        final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace) {
+                       final byte[] serializedKeyAndNamespace,
+                       final byte[] serializedStateDescriptor) {
                LOG.debug("Sending State Request to {}.", remoteAddress);
                try {
-                       KvStateRequest request = new KvStateRequest(jobId, 
queryableStateName, keyHashCode, serializedKeyAndNamespace);
+                       KvStateRequest request = new KvStateRequest(jobId, 
queryableStateName, keyHashCode,
+                                                                               
        serializedKeyAndNamespace, serializedStateDescriptor);
                        return client.sendRequest(remoteAddress, request);
                } catch (Exception e) {
                        LOG.error("Unable to send KVStateRequest: ", e);
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
index 4a64678e550..0caf68b28a7 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.queryablestate.client.state.serialization;
 
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -264,4 +269,39 @@
                        return null;
                }
        }
+
+       /**
+        * Serialize a stateDescriptor to bytes[].
+        * @param stateDescriptor the value will be serialized.
+        *
+        * @return The serialized values
+        * @throws IOException On failure during serialization
+        */
+       public static byte[] serializedStateDescriptor(StateDescriptor<?, ?> 
stateDescriptor) throws IOException {
+               ByteArrayOutputStream bos = new ByteArrayOutputStream();
+               try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+                       out.writeObject(stateDescriptor);
+               }
+
+               byte[] buf = bos.toByteArray();
+               return buf;
+       }
+
+       /**
+        * Deserialized StateDescriptor from byte[].
+        * @param serializedValue the serializedValue
+        *
+        * @return The deserialized stateDescriptor
+        * @throws IOException On failure during deserialization
+        * @throws ClassNotFoundException on failure during deserialization
+        */
+       public static StateDescriptor<?, ?> deserializeStateDescriptor(byte[] 
serializedValue) throws IOException, ClassNotFoundException {
+               if (serializedValue == null) {
+                       return null;
+               }
+
+               ByteArrayInputStream bis = new 
ByteArrayInputStream(serializedValue);
+               ObjectInputStream in = new ObjectInputStream(bis);
+               return (StateDescriptor<?, ?>) in.readObject();
+       }
 }
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
index 2e2ea6a3fcb..11144e1bf28 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
@@ -41,17 +41,20 @@
        private final String stateName;
        private final int keyHashCode;
        private final byte[] serializedKeyAndNamespace;
+       private final byte[] serializedStateDescriptor;
 
        public KvStateRequest(
                        final JobID jobId,
                        final String stateName,
                        final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace) {
+                       final byte[] serializedKeyAndNamespace,
+                       final byte[] serializedStateDescriptor) {
 
                this.jobId = Preconditions.checkNotNull(jobId);
                this.stateName = Preconditions.checkNotNull(stateName);
                this.keyHashCode = keyHashCode;
                this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace);
+               this.serializedStateDescriptor = serializedStateDescriptor;
        }
 
        public JobID getJobId() {
@@ -70,17 +73,23 @@ public int getKeyHashCode() {
                return serializedKeyAndNamespace;
        }
 
+       public byte[] getSerializedStateDescriptor() {
+               return serializedStateDescriptor;
+       }
+
        @Override
        public byte[] serialize() {
 
                byte[] serializedStateName = 
stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
 
                // JobID + stateName + sizeOf(stateName) + hashCode + 
keyAndNamespace + sizeOf(keyAndNamespace)
+               // + stateDescriptor + sizeOf(stateDescriptor)
                final int size =
                                JobID.SIZE +
                                serializedStateName.length + Integer.BYTES +
                                Integer.BYTES +
-                               serializedKeyAndNamespace.length + 
Integer.BYTES;
+                               serializedKeyAndNamespace.length + 
Integer.BYTES +
+                               serializedStateDescriptor.length + 
Integer.BYTES;
 
                return ByteBuffer.allocate(size)
                                .putLong(jobId.getLowerPart())
@@ -90,6 +99,8 @@ public int getKeyHashCode() {
                                .putInt(keyHashCode)
                                .putInt(serializedKeyAndNamespace.length)
                                .put(serializedKeyAndNamespace)
+                               .putInt(serializedStateDescriptor.length)
+                               .put(serializedStateDescriptor)
                                .array();
        }
 
@@ -135,7 +146,17 @@ public KvStateRequest deserializeMessage(ByteBuf buf) {
                        if (knamespaceLength > 0) {
                                buf.readBytes(serializedKeyAndNamespace);
                        }
-                       return new KvStateRequest(jobId, stateName, 
keyHashCode, serializedKeyAndNamespace);
+
+                       int descriptorLength = buf.readInt();
+                       Preconditions.checkArgument(descriptorLength >= 0,
+                               "Negative length for stateDescriptor. " +
+                               "This indicates a serialization error.");
+
+                       byte[] serializedStateDescriptor = new 
byte[descriptorLength];
+                       if (descriptorLength > 0) {
+                               buf.readBytes(serializedStateDescriptor);
+                       }
+                       return new KvStateRequest(jobId, stateName, 
keyHashCode, serializedKeyAndNamespace, serializedStateDescriptor);
                }
        }
 }
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 1586566bb17..4dbfcd7244b 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -171,7 +171,7 @@ private void executeActionAsync(
                                                // Query server
                                                final KvStateID kvStateId = 
location.getKvStateID(keyGroupIndex);
                                                final KvStateInternalRequest 
internalRequest = new KvStateInternalRequest(
-                                                               kvStateId, 
request.getSerializedKeyAndNamespace());
+                                                               kvStateId, 
request.getSerializedKeyAndNamespace(), request.getSerializedStateDescriptor());
                                                return 
kvStateClient.sendRequest(serverAddress, internalRequest);
                                        }
                                }, queryExecutor);
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
index 8c8de59a6e6..28ade944019 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -38,13 +38,16 @@
 
        private final KvStateID kvStateId;
        private final byte[] serializedKeyAndNamespace;
+       private final byte[] serializedStateDescriptor;
 
        public KvStateInternalRequest(
                        final KvStateID stateId,
-                       final byte[] serializedKeyAndNamespace) {
+                       final byte[] serializedKeyAndNamespace,
+                       final byte[] serializedStateDescriptor) {
 
                this.kvStateId = Preconditions.checkNotNull(stateId);
                this.serializedKeyAndNamespace = 
Preconditions.checkNotNull(serializedKeyAndNamespace);
+               this.serializedStateDescriptor = 
Preconditions.checkNotNull(serializedStateDescriptor);
        }
 
        public KvStateID getKvStateId() {
@@ -55,17 +58,25 @@ public KvStateID getKvStateId() {
                return serializedKeyAndNamespace;
        }
 
+       public byte[] getSerializedStateDescriptor() {
+               return serializedStateDescriptor;
+       }
+
        @Override
        public byte[] serialize() {
 
                // KvStateId + sizeOf(serializedKeyAndNamespace) + 
serializedKeyAndNamespace
-               final int size = KvStateID.SIZE + Integer.BYTES + 
serializedKeyAndNamespace.length;
+               // + sizeOf(serializedStateDescriptor) + 
serializedStateDescriptor
+               final int size = KvStateID.SIZE + Integer.BYTES + 
serializedKeyAndNamespace.length
+                                               + Integer.BYTES + 
serializedStateDescriptor.length;
 
                return ByteBuffer.allocate(size)
                                .putLong(kvStateId.getLowerPart())
                                .putLong(kvStateId.getUpperPart())
                                .putInt(serializedKeyAndNamespace.length)
                                .put(serializedKeyAndNamespace)
+                               .putInt(serializedStateDescriptor.length)
+                               .put(serializedStateDescriptor)
                                .array();
        }
 
@@ -87,7 +98,17 @@ public KvStateInternalRequest deserializeMessage(ByteBuf 
buf) {
                        if (length > 0) {
                                buf.readBytes(serializedKeyAndNamespace);
                        }
-                       return new KvStateInternalRequest(kvStateId, 
serializedKeyAndNamespace);
+
+                       int descriptorLength = buf.readInt();
+                       Preconditions.checkArgument(descriptorLength >= 0,
+                                                       "Negative length for 
key and namespace. " +
+                                                       "This indicates a 
serialization error.");
+                       byte[] serializedStateDescriptor = new 
byte[descriptorLength];
+                       if (descriptorLength > 0) {
+                               buf.readBytes(serializedStateDescriptor);
+                       }
+
+                       return new KvStateInternalRequest(kvStateId, 
serializedKeyAndNamespace, serializedStateDescriptor);
                }
        }
 }
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index d46deffeb58..b6de5dd844a 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.queryablestate.server;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.StateDescriptor;
+import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import 
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
 import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
 import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
@@ -81,6 +83,14 @@ public KvStateServerHandler(
                        } else {
                                byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
+                               StateDescriptor<?, ?> requestStateDescriptor = 
KvStateSerializer.deserializeStateDescriptor(request.getSerializedStateDescriptor());
+                               StateDescriptor<?, ?> registStateDescriptor = 
kvState.getStateDescriptor();
+
+                               
Preconditions.checkArgument(requestStateDescriptor.getType().equals(registStateDescriptor.getType()),
+                                       "State type mismatch, need[%s] 
gotten[%s]", registStateDescriptor.getType(), requestStateDescriptor.getType());
+                               
Preconditions.checkArgument(requestStateDescriptor.getSerializer().equals(registStateDescriptor.getSerializer()),
+                                       "State value serializer mismatch, need 
[%s] gotten[%s]" , registStateDescriptor.getSerializer(), 
requestStateDescriptor.getSerializer());
+
                                byte[] serializedResult = 
getSerializedValue(kvState, serializedKeyAndNamespace);
                                if (serializedResult != null) {
                                        responseFuture.complete(new 
KvStateResponse(serializedResult));
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index bceb3615bde..abc112ddd9f 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -158,7 +158,7 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
 
                        List<CompletableFuture<KvStateResponse>> futures = new 
ArrayList<>();
                        for (long i = 0L; i < numQueries; i++) {
-                               KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                               KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
                                futures.add(client.sendRequest(serverAddress, 
request));
                        }
 
@@ -277,7 +277,7 @@ public void testRequestUnavailableHost() throws Exception {
                                        InetAddress.getLocalHost(),
                                        availablePort);
 
-                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
                        CompletableFuture<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
 
                        try {
@@ -356,7 +356,7 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                                List<CompletableFuture<KvStateResponse>> 
results = new ArrayList<>(numQueriesPerTask);
 
                                for (int i = 0; i < numQueriesPerTask; i++) {
-                                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
                                        
results.add(finalClient.sendRequest(serverAddress, request));
                                }
 
@@ -446,7 +446,7 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
 
                        // Requests
                        List<Future<KvStateResponse>> futures = new 
ArrayList<>();
-                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
 
                        futures.add(client.sendRequest(serverAddress, request));
                        futures.add(client.sendRequest(serverAddress, request));
@@ -555,7 +555,7 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
                        InetSocketAddress serverAddress = 
getKvStateServerAddress(serverChannel);
 
                        // Requests
-                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0]);
+                       KvStateInternalRequest request = new 
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
                        Future<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
 
                        while (!received.get() && deadline.hasTimeLeft()) {
@@ -689,7 +689,7 @@ public void testClientServerIntegration() throws Throwable {
                                InternalKvState<Integer, ?, Integer> kvState = 
(InternalKvState<Integer, ?, Integer>) state;
 
                                // Register KvState (one state instance for all 
server)
-                               ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+                               ids[i] = registry[i].registerKvState(new 
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState, desc);
                        }
 
                        final Client<KvStateInternalRequest, KvStateResponse> 
finalClient = client;
@@ -717,8 +717,10 @@ public void testClientServerIntegration() throws Throwable 
{
                                                                
IntSerializer.INSTANCE,
                                                                
VoidNamespace.INSTANCE,
                                                                
VoidNamespaceSerializer.INSTANCE);
+                                               byte[] 
serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
 
-                                               KvStateInternalRequest request 
= new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
+                                               KvStateInternalRequest request 
= new KvStateInternalRequest(ids[targetServer],
+                                                                               
                                                serializedKeyAndNamespace, 
serializedStateDescriptor);
                                                
futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), 
request));
                                        }
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index adcf3aefb7c..c131be8b5f5 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -148,13 +149,14 @@ public void testSimpleQuery() throws Exception {
                                IntSerializer.INSTANCE,
                                VoidNamespace.INSTANCE,
                                VoidNamespaceSerializer.INSTANCE);
+               byte[] serializedStateDescriptor = 
KvStateSerializer.serializedStateDescriptor(desc);
 
                long requestId = Integer.MAX_VALUE + 182828L;
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
                KvStateInternalRequest request = new KvStateInternalRequest(
-                               registryListener.kvStateId, 
serializedKeyAndNamespace);
+                               registryListener.kvStateId, 
serializedKeyAndNamespace, serializedStateDescriptor);
 
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
@@ -202,7 +204,7 @@ public void testQueryUnknownKvStateID() throws Exception {
 
                long requestId = Integer.MAX_VALUE + 182828L;
 
-               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0]);
+               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0], new byte[0]);
 
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
@@ -259,12 +261,13 @@ public void testQueryUnknownKey() throws Exception {
                                IntSerializer.INSTANCE,
                                VoidNamespace.INSTANCE,
                                VoidNamespaceSerializer.INSTANCE);
+               byte[] serializedStateDescriptor = 
KvStateSerializer.serializedStateDescriptor(desc);
 
                long requestId = Integer.MAX_VALUE + 22982L;
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace, 
serializedStateDescriptor);
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
                // Write the request and wait for the response
@@ -300,51 +303,18 @@ public void testFailureOnGetSerializedValue() throws 
Exception {
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
                // Failing KvState
-               InternalKvState<Integer, VoidNamespace, Long> kvState =
-                               new InternalKvState<Integer, VoidNamespace, 
Long>() {
-                                       @Override
-                                       public TypeSerializer<Integer> 
getKeySerializer() {
-                                               return IntSerializer.INSTANCE;
-                                       }
-
-                                       @Override
-                                       public TypeSerializer<VoidNamespace> 
getNamespaceSerializer() {
-                                               return 
VoidNamespaceSerializer.INSTANCE;
-                                       }
-
-                                       @Override
-                                       public TypeSerializer<Long> 
getValueSerializer() {
-                                               return LongSerializer.INSTANCE;
-                                       }
-
-                                       @Override
-                                       public void 
setCurrentNamespace(VoidNamespace namespace) {
-                                               // do nothing
-                                       }
-
-                                       @Override
-                                       public byte[] getSerializedValue(
-                                                       final byte[] 
serializedKeyAndNamespace,
-                                                       final 
TypeSerializer<Integer> safeKeySerializer,
-                                                       final 
TypeSerializer<VoidNamespace> safeNamespaceSerializer,
-                                                       final 
TypeSerializer<Long> safeValueSerializer) throws Exception {
-                                               throw new 
RuntimeException("Expected test Exception");
-                                       }
-
-                                       @Override
-                                       public void clear() {
-
-                                       }
-                               };
+               InternalKvState<Integer, VoidNamespace, Long> kvState = 
getFailingState();
+               ValueStateDescriptor<Long> descriptor = new 
ValueStateDescriptor("test", LongSerializer.INSTANCE);
 
                KvStateID kvStateId = registry.registerKvState(
                                new JobID(),
                                new JobVertexID(),
                                new KeyGroupRange(0, 0),
                                "vanilla",
-                               kvState);
+                               kvState,
+                               descriptor);
 
-               KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, new byte[0]);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, new byte[0], 
KvStateSerializer.serializedStateDescriptor(descriptor));
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
                // Write the request and wait for the response
@@ -363,6 +333,92 @@ public void clear() {
                assertEquals(1L, stats.getNumFailed());
        }
 
+       @Test
+       public void testFailureOnCheckStateType() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                       new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               // Failing KvState
+               InternalKvState<Integer, VoidNamespace, Long> kvState = 
getFailingState();
+               ValueStateDescriptor<Long> registDescriptor = new 
ValueStateDescriptor("regist", LongSerializer.INSTANCE);
+
+               KvStateID kvStateId = registry.registerKvState(
+                       new JobID(),
+                       new JobVertexID(),
+                       new KeyGroupRange(0, 0),
+                       "vanilla",
+                       kvState,
+                       registDescriptor);
+
+               ListStateDescriptor<Long> requestdescriptor = new 
ListStateDescriptor("test", LongSerializer.INSTANCE);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, new byte[0], 
KvStateSerializer.serializedStateDescriptor(requestdescriptor));
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
+
+               // Write the request and wait for the response
+               channel.writeInbound(serRequest);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
+
+               assertTrue(response.getCause().getMessage().contains("State 
type mismatch"));
+
+               assertEquals(1L, stats.getNumRequests());
+               assertEquals(1L, stats.getNumFailed());
+       }
+
+       @Test
+       public void testFailureOnCheckStateValueType() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
+                       new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               // Failing KvState
+               InternalKvState<Integer, VoidNamespace, Long> kvState = 
getFailingState();
+
+               ValueStateDescriptor<Long> registDescriptor = new 
ValueStateDescriptor("test", LongSerializer.INSTANCE);
+               KvStateID kvStateId = registry.registerKvState(
+                       new JobID(),
+                       new JobVertexID(),
+                       new KeyGroupRange(0, 0),
+                       "vanilla",
+                       kvState,
+                       registDescriptor);
+
+               ValueStateDescriptor<Long> requestDescriptor = new 
ValueStateDescriptor("test", IntSerializer.INSTANCE);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, new byte[0], 
KvStateSerializer.serializedStateDescriptor(requestDescriptor));
+               ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
+
+               // Write the request and wait for the response
+               channel.writeInbound(serRequest);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               RequestFailure response = 
MessageSerializer.deserializeRequestFailure(buf);
+
+               assertTrue(response.getCause().getMessage().contains("State 
value serializer mismatch"));
+
+               assertEquals(1L, stats.getNumRequests());
+               assertEquals(1L, stats.getNumFailed());
+       }
+
        /**
         * Tests that the channel is closed if an Exception reaches the channel 
handler.
         */
@@ -435,7 +491,7 @@ public void testQueryExecutorShutDown() throws Throwable {
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, new byte[0], new byte[0]);
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
                // Write the request and wait for the response
@@ -519,7 +575,7 @@ public void testIncomingBufferIsRecycled() throws Exception 
{
                KvStateServerHandler handler = new 
KvStateServerHandler(testServer, registry, serializer, stats);
                EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
 
-               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0]);
+               KvStateInternalRequest request = new KvStateInternalRequest(new 
KvStateID(), new byte[0], new byte[0]);
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
 
                assertEquals(1L, serRequest.refCnt());
@@ -588,10 +644,12 @@ public void testSerializerMismatch() throws Exception {
                                IntSerializer.INSTANCE,
                                "wrong-namespace-type",
                                StringSerializer.INSTANCE);
+               byte[] serializedStateDescriptor = 
KvStateSerializer.serializedStateDescriptor(desc);
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId,
+                                                                               
                                                        wrongKeyAndNamespace, 
serializedStateDescriptor);
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 182828L, request);
 
                // Write the request and wait for the response
@@ -607,7 +665,7 @@ public void testSerializerMismatch() throws Exception {
                
assertTrue(response.getCause().getMessage().contains("IOException"));
 
                // Repeat with wrong namespace only
-               request = new 
KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
+               request = new 
KvStateInternalRequest(registryListener.kvStateId, wrongNamespace, 
serializedStateDescriptor);
                serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), 182829L, request);
 
                // Write the request and wait for the response
@@ -676,12 +734,13 @@ public void testChunkedResponse() throws Exception {
                                IntSerializer.INSTANCE,
                                VoidNamespace.INSTANCE,
                                VoidNamespaceSerializer.INSTANCE);
+               byte[] serializedStateDescriptor = 
KvStateSerializer.serializedStateDescriptor(desc);
 
                long requestId = Integer.MAX_VALUE + 182828L;
 
                assertTrue(registryListener.registrationName.equals("vanilla"));
 
-               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+               KvStateInternalRequest request = new 
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace, 
serializedStateDescriptor);
                ByteBuf serRequest = 
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
 
                // Write the request and wait for the response
@@ -765,4 +824,44 @@ public void notifyKvStateUnregistered(JobID jobId,
                        registry.createTaskRegistry(dummyEnv.getJobID(), 
dummyEnv.getJobVertexId()),
                        TtlTimeProvider.DEFAULT);
        }
+
+       private InternalKvState<Integer, VoidNamespace, Long> getFailingState() 
{
+               InternalKvState<Integer, VoidNamespace, Long> kvState =
+                       new InternalKvState<Integer, VoidNamespace, Long>() {
+                               @Override
+                               public TypeSerializer<Integer> 
getKeySerializer() {
+                                       return IntSerializer.INSTANCE;
+                               }
+
+                               @Override
+                               public TypeSerializer<VoidNamespace> 
getNamespaceSerializer() {
+                                       return VoidNamespaceSerializer.INSTANCE;
+                               }
+
+                               @Override
+                               public TypeSerializer<Long> 
getValueSerializer() {
+                                       return LongSerializer.INSTANCE;
+                               }
+
+                               @Override
+                               public void setCurrentNamespace(VoidNamespace 
namespace) {
+                                       // do nothing
+                               }
+
+                               @Override
+                               public byte[] getSerializedValue(
+                                       final byte[] serializedKeyAndNamespace,
+                                       final TypeSerializer<Integer> 
safeKeySerializer,
+                                       final TypeSerializer<VoidNamespace> 
safeNamespaceSerializer,
+                                       final TypeSerializer<Long> 
safeValueSerializer) throws Exception {
+                                       throw new RuntimeException("Expected 
test Exception");
+                               }
+
+                               @Override
+                               public void clear() {
+
+                               }
+                       };
+               return kvState;
+       }
 }
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 79c23ad2a2d..754aaea2f17 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -145,6 +145,7 @@ public void testSimpleRequest() throws Throwable {
                                        IntSerializer.INSTANCE,
                                        VoidNamespace.INSTANCE,
                                        VoidNamespaceSerializer.INSTANCE);
+                       byte[] serializedStateDescriptor = 
KvStateSerializer.serializedStateDescriptor(desc);
 
                        // Connect to the server
                        final BlockingQueue<ByteBuf> responses = new 
LinkedBlockingQueue<>();
@@ -167,7 +168,8 @@ public void channelRead(ChannelHandlerContext ctx, Object 
msg) throws Exception
 
                        final KvStateInternalRequest request = new 
KvStateInternalRequest(
                                        registryListener.kvStateId,
-                                       serializedKeyAndNamespace);
+                                       serializedKeyAndNamespace,
+                                       serializedStateDescriptor);
 
                        ByteBuf serializeRequest = 
MessageSerializer.serializeRequest(
                                        channel.alloc(),
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
index acaa0671bc0..a36612ff757 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -64,8 +64,9 @@ public void testRequestSerialization() throws Exception {
                long requestId = Integer.MAX_VALUE + 1337L;
                KvStateID kvStateId = new KvStateID();
                byte[] serializedKeyAndNamespace = randomByteArray(1024);
+               byte[] serializedStateDescriptor = randomByteArray(1024);
 
-               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace, 
serializedStateDescriptor);
                final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
                                new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
 
@@ -91,8 +92,9 @@ public void 
testRequestSerializationWithZeroLengthKeyAndNamespace() throws Excep
                long requestId = Integer.MAX_VALUE + 1337L;
                KvStateID kvStateId = new KvStateID();
                byte[] serializedKeyAndNamespace = new byte[0];
+               byte[] serializedStateDescriptor = new byte[0];
 
-               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace, 
serializedStateDescriptor);
                final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
                                new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
 
@@ -115,7 +117,16 @@ public void 
testRequestSerializationWithZeroLengthKeyAndNamespace() throws Excep
         */
        @Test(expected = NullPointerException.class)
        public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
-               new KvStateInternalRequest(new KvStateID(), null);
+               new KvStateInternalRequest(new KvStateID(), null, new byte[0]);
+       }
+
+       /**
+        * Tests that we don't try to be smart about <code>null</code> 
stateDescriptor.
+        * They should be treated explicityly.
+        */
+       @Test(expected = NullPointerException.class)
+       public void testNullPointerExceptionOnNullSerializedStateDescriptor() 
throws Exception {
+               new KvStateInternalRequest(new KvStateID(), new byte[0], null);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
index 0bd132f6e05..e9fdd6633ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
@@ -38,13 +39,15 @@
 
        private final InternalKvState<K, N, V> state;
        private final KvStateInfo<K, N, V> stateInfo;
+       private final StateDescriptor<?, ?> stateDescriptor;
 
        private final boolean areSerializersStateless;
 
        private final ConcurrentMap<Thread, KvStateInfo<K, N, V>> 
serializerCache;
 
-       public KvStateEntry(final InternalKvState<K, N, V> state) {
+       public KvStateEntry(final InternalKvState<K, N, V> state, final 
StateDescriptor<?, ?> stateDescriptor) {
                this.state = Preconditions.checkNotNull(state);
+               this.stateDescriptor = 
Preconditions.checkNotNull(stateDescriptor);
                this.stateInfo = new KvStateInfo<>(
                                state.getKeySerializer(),
                                state.getNamespaceSerializer(),
@@ -64,6 +67,10 @@ public KvStateEntry(final InternalKvState<K, N, V> state) {
                                : 
serializerCache.computeIfAbsent(Thread.currentThread(), t -> 
stateInfo.duplicate());
        }
 
+       public StateDescriptor getStateDescriptor() {
+               return this.stateDescriptor;
+       }
+
        public void clear() {
                serializerCache.clear();
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 430b06bd7d1..14165f41287 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -81,15 +82,16 @@ public void unregisterListener(JobID jobId) {
         * @return Assigned KvStateID
         */
        public KvStateID registerKvState(
-                       JobID jobId,
-                       JobVertexID jobVertexId,
-                       KeyGroupRange keyGroupRange,
-                       String registrationName,
-                       InternalKvState<?, ?, ?> kvState) {
+               JobID jobId,
+               JobVertexID jobVertexId,
+               KeyGroupRange keyGroupRange,
+               String registrationName,
+               InternalKvState<?, ?, ?> kvState,
+               StateDescriptor<?, ?> stateDescriptor) {
 
                KvStateID kvStateId = new KvStateID();
 
-               if (registeredKvStates.putIfAbsent(kvStateId, new 
KvStateEntry<>(kvState)) == null) {
+               if (registeredKvStates.putIfAbsent(kvStateId, new 
KvStateEntry(kvState, stateDescriptor)) == null) {
                        final KvStateRegistryListener listener = 
getKvStateRegistryListener(jobId);
 
                        if (listener != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index a44a508ecbc..bfa6d724c0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -58,11 +59,12 @@
         * @param registrationName The registration name (not necessarily the 
same
         *                         as the KvState name defined in the state
         *                         descriptor used to create the KvState 
instance)
-        * @param kvState          The
+        * @param kvState          The internal kv instate
+        * @param stateDescriptor  The descriptor of the state
         */
-       public void registerKvState(KeyGroupRange keyGroupRange, String 
registrationName, InternalKvState<?, ?, ?> kvState) {
-               KvStateID kvStateId = registry.registerKvState(jobId, 
jobVertexId, keyGroupRange, registrationName, kvState);
-               registeredKvStates.add(new KvStateInfo(keyGroupRange, 
registrationName, kvStateId));
+       public void registerKvState(KeyGroupRange keyGroupRange, String 
registrationName, InternalKvState<?, ?, ?> kvState, StateDescriptor<?, ?> 
stateDescriptor) {
+               KvStateID kvStateId = registry.registerKvState(jobId, 
jobVertexId, keyGroupRange, registrationName, kvState, stateDescriptor);
+               registeredKvStates.add(new KvStateInfo(keyGroupRange, 
registrationName, kvStateId, stateDescriptor));
        }
 
        /**
@@ -85,7 +87,7 @@ public void unregisterAll() {
 
                private final KvStateID kvStateId;
 
-               KvStateInfo(KeyGroupRange keyGroupRange, String 
registrationName, KvStateID kvStateId) {
+               KvStateInfo(KeyGroupRange keyGroupRange, String 
registrationName, KvStateID kvStateId, StateDescriptor stateDescriptor) {
                        this.keyGroupRange = keyGroupRange;
                        this.registrationName = registrationName;
                        this.kvStateId = kvStateId;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 17d24f77472..a8dc3064bd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -251,7 +251,7 @@ private void publishQueryableStateIfEnabled(
                                throw new IllegalStateException("State backend 
has not been initialized for job.");
                        }
                        String name = stateDescriptor.getQueryableStateName();
-                       kvStateRegistry.registerKvState(keyGroupRange, name, 
kvState);
+                       kvStateRegistry.registerKvState(keyGroupRange, name, 
kvState, stateDescriptor);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index c1c56bf250b..deef7fe49be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.queryablestate.KvStateID;
@@ -77,7 +79,8 @@ public void testKvStateEntry() throws InterruptedException {
                                jobVertexId,
                                keyGroupRange,
                                registrationName,
-                               new DummyKvState()
+                               new DummyKvState(),
+                               getDummyStateDescriptor()
                );
 
                final AtomicReference<Throwable> exceptionHolder = new 
AtomicReference<>();
@@ -173,7 +176,8 @@ public void testKvStateRegistryListenerNotification() {
                        jobVertexId,
                        keyGroupRange,
                        registrationName,
-                       new DummyKvState());
+                       new DummyKvState(),
+                       getDummyStateDescriptor());
 
                assertThat(registeredNotifications1.poll(), equalTo(jobId1));
                assertThat(registeredNotifications2.isEmpty(), is(true));
@@ -186,7 +190,8 @@ public void testKvStateRegistryListenerNotification() {
                        jobVertexId2,
                        keyGroupRange2,
                        registrationName2,
-                       new DummyKvState());
+                       new DummyKvState(),
+                       getDummyStateDescriptor());
 
                assertThat(registeredNotifications2.poll(), equalTo(jobId2));
                assertThat(registeredNotifications1.isEmpty(), is(true));
@@ -244,7 +249,8 @@ public void testLegacyCodePathPreference() {
                        jobVertexId,
                        keyGroupRange,
                        registrationName,
-                       new DummyKvState());
+                       new DummyKvState(),
+                       getDummyStateDescriptor());
 
                assertThat(stateRegistrationNotifications.poll(), 
equalTo(jobId));
                // another listener should not have received any notifications
@@ -410,4 +416,9 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() 
{
                        return null;
                }
        }
+
+       private ValueStateDescriptor<String> getDummyStateDescriptor() {
+               ValueStateDescriptor<String> dummyDescriptor = new 
ValueStateDescriptor<String>("desriptor", StringSerializer.INSTANCE);
+               return dummyDescriptor;
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Queryable state should check types
> ----------------------------------
>
>                 Key: FLINK-8058
>                 URL: https://issues.apache.org/jira/browse/FLINK-8058
>             Project: Flink
>          Issue Type: Improvement
>          Components: Queryable State
>    Affects Versions: 1.4.0
>            Reporter: Chesnay Schepler
>            Assignee: Congxian Qiu
>            Priority: Major
>              Labels: pull-request-available
>
> The queryable state currently doesn't do any type checks on the client or 
> server and generally relies on serializers to catch errors.
> Neither the type of state is checked (ValueState, ListState etc.) nor the 
> type of contained values.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to