[
https://issues.apache.org/jira/browse/FLINK-8058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646244#comment-16646244
]
ASF GitHub Bot commented on FLINK-8058:
---------------------------------------
klion26 closed pull request #6430: [FLINK-8058][Queryable State]Queryable state
should check types
URL: https://github.com/apache/flink/pull/6430
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 470c7acf9d7..70f2c31642c 100644
---
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -263,14 +263,16 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig
config) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
final byte[] serializedKeyAndNamespace;
+ final byte[] serializedStateDescriptor;
try {
serializedKeyAndNamespace = KvStateSerializer
.serializeKeyAndNamespace(key,
keySerializer, namespace, namespaceSerializer);
+ serializedStateDescriptor =
KvStateSerializer.serializedStateDescriptor(stateDescriptor);
} catch (IOException e) {
return FutureUtils.getFailedFuture(e);
}
- return getKvState(jobId, queryableStateName, key.hashCode(),
serializedKeyAndNamespace)
+ return getKvState(jobId, queryableStateName, key.hashCode(),
serializedKeyAndNamespace, serializedStateDescriptor)
.thenApply(stateResponse -> createState(stateResponse,
stateDescriptor));
}
@@ -306,10 +308,12 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig
config) {
final JobID jobId,
final String queryableStateName,
final int keyHashCode,
- final byte[] serializedKeyAndNamespace) {
+ final byte[] serializedKeyAndNamespace,
+ final byte[] serializedStateDescriptor) {
LOG.debug("Sending State Request to {}.", remoteAddress);
try {
- KvStateRequest request = new KvStateRequest(jobId,
queryableStateName, keyHashCode, serializedKeyAndNamespace);
+ KvStateRequest request = new KvStateRequest(jobId,
queryableStateName, keyHashCode,
+
serializedKeyAndNamespace, serializedStateDescriptor);
return client.sendRequest(remoteAddress, request);
} catch (Exception e) {
LOG.error("Unable to send KVStateRequest: ", e);
diff --git
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
index 4a64678e550..0caf68b28a7 100644
---
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
+++
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/KvStateSerializer.java
@@ -18,12 +18,17 @@
package org.apache.flink.queryablestate.client.state.serialization;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -264,4 +269,39 @@
return null;
}
}
+
+ /**
+ * Serialize a stateDescriptor to bytes[].
+ * @param stateDescriptor the value will be serialized.
+ *
+ * @return The serialized values
+ * @throws IOException On failure during serialization
+ */
+ public static byte[] serializedStateDescriptor(StateDescriptor<?, ?>
stateDescriptor) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+ out.writeObject(stateDescriptor);
+ }
+
+ byte[] buf = bos.toByteArray();
+ return buf;
+ }
+
+ /**
+ * Deserialized StateDescriptor from byte[].
+ * @param serializedValue the serializedValue
+ *
+ * @return The deserialized stateDescriptor
+ * @throws IOException On failure during deserialization
+ * @throws ClassNotFoundException on failure during deserialization
+ */
+ public static StateDescriptor<?, ?> deserializeStateDescriptor(byte[]
serializedValue) throws IOException, ClassNotFoundException {
+ if (serializedValue == null) {
+ return null;
+ }
+
+ ByteArrayInputStream bis = new
ByteArrayInputStream(serializedValue);
+ ObjectInputStream in = new ObjectInputStream(bis);
+ return (StateDescriptor<?, ?>) in.readObject();
+ }
}
diff --git
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
index 2e2ea6a3fcb..11144e1bf28 100644
---
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
+++
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/messages/KvStateRequest.java
@@ -41,17 +41,20 @@
private final String stateName;
private final int keyHashCode;
private final byte[] serializedKeyAndNamespace;
+ private final byte[] serializedStateDescriptor;
public KvStateRequest(
final JobID jobId,
final String stateName,
final int keyHashCode,
- final byte[] serializedKeyAndNamespace) {
+ final byte[] serializedKeyAndNamespace,
+ final byte[] serializedStateDescriptor) {
this.jobId = Preconditions.checkNotNull(jobId);
this.stateName = Preconditions.checkNotNull(stateName);
this.keyHashCode = keyHashCode;
this.serializedKeyAndNamespace =
Preconditions.checkNotNull(serializedKeyAndNamespace);
+ this.serializedStateDescriptor = serializedStateDescriptor;
}
public JobID getJobId() {
@@ -70,17 +73,23 @@ public int getKeyHashCode() {
return serializedKeyAndNamespace;
}
+ public byte[] getSerializedStateDescriptor() {
+ return serializedStateDescriptor;
+ }
+
@Override
public byte[] serialize() {
byte[] serializedStateName =
stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
// JobID + stateName + sizeOf(stateName) + hashCode +
keyAndNamespace + sizeOf(keyAndNamespace)
+ // + stateDescriptor + sizeOf(stateDescriptor)
final int size =
JobID.SIZE +
serializedStateName.length + Integer.BYTES +
Integer.BYTES +
- serializedKeyAndNamespace.length +
Integer.BYTES;
+ serializedKeyAndNamespace.length +
Integer.BYTES +
+ serializedStateDescriptor.length +
Integer.BYTES;
return ByteBuffer.allocate(size)
.putLong(jobId.getLowerPart())
@@ -90,6 +99,8 @@ public int getKeyHashCode() {
.putInt(keyHashCode)
.putInt(serializedKeyAndNamespace.length)
.put(serializedKeyAndNamespace)
+ .putInt(serializedStateDescriptor.length)
+ .put(serializedStateDescriptor)
.array();
}
@@ -135,7 +146,17 @@ public KvStateRequest deserializeMessage(ByteBuf buf) {
if (knamespaceLength > 0) {
buf.readBytes(serializedKeyAndNamespace);
}
- return new KvStateRequest(jobId, stateName,
keyHashCode, serializedKeyAndNamespace);
+
+ int descriptorLength = buf.readInt();
+ Preconditions.checkArgument(descriptorLength >= 0,
+ "Negative length for stateDescriptor. " +
+ "This indicates a serialization error.");
+
+ byte[] serializedStateDescriptor = new
byte[descriptorLength];
+ if (descriptorLength > 0) {
+ buf.readBytes(serializedStateDescriptor);
+ }
+ return new KvStateRequest(jobId, stateName,
keyHashCode, serializedKeyAndNamespace, serializedStateDescriptor);
}
}
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 1586566bb17..4dbfcd7244b 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -171,7 +171,7 @@ private void executeActionAsync(
// Query server
final KvStateID kvStateId =
location.getKvStateID(keyGroupIndex);
final KvStateInternalRequest
internalRequest = new KvStateInternalRequest(
- kvStateId,
request.getSerializedKeyAndNamespace());
+ kvStateId,
request.getSerializedKeyAndNamespace(), request.getSerializedStateDescriptor());
return
kvStateClient.sendRequest(serverAddress, internalRequest);
}
}, queryExecutor);
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
index 8c8de59a6e6..28ade944019 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -38,13 +38,16 @@
private final KvStateID kvStateId;
private final byte[] serializedKeyAndNamespace;
+ private final byte[] serializedStateDescriptor;
public KvStateInternalRequest(
final KvStateID stateId,
- final byte[] serializedKeyAndNamespace) {
+ final byte[] serializedKeyAndNamespace,
+ final byte[] serializedStateDescriptor) {
this.kvStateId = Preconditions.checkNotNull(stateId);
this.serializedKeyAndNamespace =
Preconditions.checkNotNull(serializedKeyAndNamespace);
+ this.serializedStateDescriptor =
Preconditions.checkNotNull(serializedStateDescriptor);
}
public KvStateID getKvStateId() {
@@ -55,17 +58,25 @@ public KvStateID getKvStateId() {
return serializedKeyAndNamespace;
}
+ public byte[] getSerializedStateDescriptor() {
+ return serializedStateDescriptor;
+ }
+
@Override
public byte[] serialize() {
// KvStateId + sizeOf(serializedKeyAndNamespace) +
serializedKeyAndNamespace
- final int size = KvStateID.SIZE + Integer.BYTES +
serializedKeyAndNamespace.length;
+ // + sizeOf(serializedStateDescriptor) +
serializedStateDescriptor
+ final int size = KvStateID.SIZE + Integer.BYTES +
serializedKeyAndNamespace.length
+ + Integer.BYTES +
serializedStateDescriptor.length;
return ByteBuffer.allocate(size)
.putLong(kvStateId.getLowerPart())
.putLong(kvStateId.getUpperPart())
.putInt(serializedKeyAndNamespace.length)
.put(serializedKeyAndNamespace)
+ .putInt(serializedStateDescriptor.length)
+ .put(serializedStateDescriptor)
.array();
}
@@ -87,7 +98,17 @@ public KvStateInternalRequest deserializeMessage(ByteBuf
buf) {
if (length > 0) {
buf.readBytes(serializedKeyAndNamespace);
}
- return new KvStateInternalRequest(kvStateId,
serializedKeyAndNamespace);
+
+ int descriptorLength = buf.readInt();
+ Preconditions.checkArgument(descriptorLength >= 0,
+ "Negative length for
key and namespace. " +
+ "This indicates a
serialization error.");
+ byte[] serializedStateDescriptor = new
byte[descriptorLength];
+ if (descriptorLength > 0) {
+ buf.readBytes(serializedStateDescriptor);
+ }
+
+ return new KvStateInternalRequest(kvStateId,
serializedKeyAndNamespace, serializedStateDescriptor);
}
}
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
index d46deffeb58..b6de5dd844a 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.queryablestate.server;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.StateDescriptor;
+import
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
@@ -81,6 +83,14 @@ public KvStateServerHandler(
} else {
byte[] serializedKeyAndNamespace =
request.getSerializedKeyAndNamespace();
+ StateDescriptor<?, ?> requestStateDescriptor =
KvStateSerializer.deserializeStateDescriptor(request.getSerializedStateDescriptor());
+ StateDescriptor<?, ?> registStateDescriptor =
kvState.getStateDescriptor();
+
+
Preconditions.checkArgument(requestStateDescriptor.getType().equals(registStateDescriptor.getType()),
+ "State type mismatch, need[%s]
gotten[%s]", registStateDescriptor.getType(), requestStateDescriptor.getType());
+
Preconditions.checkArgument(requestStateDescriptor.getSerializer().equals(registStateDescriptor.getSerializer()),
+ "State value serializer mismatch, need
[%s] gotten[%s]" , registStateDescriptor.getSerializer(),
requestStateDescriptor.getSerializer());
+
byte[] serializedResult =
getSerializedValue(kvState, serializedKeyAndNamespace);
if (serializedResult != null) {
responseFuture.complete(new
KvStateResponse(serializedResult));
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index bceb3615bde..abc112ddd9f 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -158,7 +158,7 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) throws Exception
List<CompletableFuture<KvStateResponse>> futures = new
ArrayList<>();
for (long i = 0L; i < numQueries; i++) {
- KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0]);
+ KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
futures.add(client.sendRequest(serverAddress,
request));
}
@@ -277,7 +277,7 @@ public void testRequestUnavailableHost() throws Exception {
InetAddress.getLocalHost(),
availablePort);
- KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0]);
+ KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
CompletableFuture<KvStateResponse> future =
client.sendRequest(serverAddress, request);
try {
@@ -356,7 +356,7 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) throws Exception
List<CompletableFuture<KvStateResponse>>
results = new ArrayList<>(numQueriesPerTask);
for (int i = 0; i < numQueriesPerTask; i++) {
- KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0]);
+ KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
results.add(finalClient.sendRequest(serverAddress, request));
}
@@ -446,7 +446,7 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) throws Exception
// Requests
List<Future<KvStateResponse>> futures = new
ArrayList<>();
- KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0]);
+ KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
futures.add(client.sendRequest(serverAddress, request));
futures.add(client.sendRequest(serverAddress, request));
@@ -555,7 +555,7 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) throws Exception
InetSocketAddress serverAddress =
getKvStateServerAddress(serverChannel);
// Requests
- KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0]);
+ KvStateInternalRequest request = new
KvStateInternalRequest(new KvStateID(), new byte[0], new byte[0]);
Future<KvStateResponse> future =
client.sendRequest(serverAddress, request);
while (!received.get() && deadline.hasTimeLeft()) {
@@ -689,7 +689,7 @@ public void testClientServerIntegration() throws Throwable {
InternalKvState<Integer, ?, Integer> kvState =
(InternalKvState<Integer, ?, Integer>) state;
// Register KvState (one state instance for all
server)
- ids[i] = registry[i].registerKvState(new
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+ ids[i] = registry[i].registerKvState(new
JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState, desc);
}
final Client<KvStateInternalRequest, KvStateResponse>
finalClient = client;
@@ -717,8 +717,10 @@ public void testClientServerIntegration() throws Throwable
{
IntSerializer.INSTANCE,
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
+ byte[]
serializedStateDescriptor = KvStateSerializer.serializedStateDescriptor(desc);
- KvStateInternalRequest request
= new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
+ KvStateInternalRequest request
= new KvStateInternalRequest(ids[targetServer],
+
serializedKeyAndNamespace,
serializedStateDescriptor);
futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(),
request));
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index adcf3aefb7c..c131be8b5f5 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.queryablestate.network;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -148,13 +149,14 @@ public void testSimpleQuery() throws Exception {
IntSerializer.INSTANCE,
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
+ byte[] serializedStateDescriptor =
KvStateSerializer.serializedStateDescriptor(desc);
long requestId = Integer.MAX_VALUE + 182828L;
assertTrue(registryListener.registrationName.equals("vanilla"));
KvStateInternalRequest request = new KvStateInternalRequest(
- registryListener.kvStateId,
serializedKeyAndNamespace);
+ registryListener.kvStateId,
serializedKeyAndNamespace, serializedStateDescriptor);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
@@ -202,7 +204,7 @@ public void testQueryUnknownKvStateID() throws Exception {
long requestId = Integer.MAX_VALUE + 182828L;
- KvStateInternalRequest request = new KvStateInternalRequest(new
KvStateID(), new byte[0]);
+ KvStateInternalRequest request = new KvStateInternalRequest(new
KvStateID(), new byte[0], new byte[0]);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
@@ -259,12 +261,13 @@ public void testQueryUnknownKey() throws Exception {
IntSerializer.INSTANCE,
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
+ byte[] serializedStateDescriptor =
KvStateSerializer.serializedStateDescriptor(desc);
long requestId = Integer.MAX_VALUE + 22982L;
assertTrue(registryListener.registrationName.equals("vanilla"));
- KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+ KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace,
serializedStateDescriptor);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
// Write the request and wait for the response
@@ -300,51 +303,18 @@ public void testFailureOnGetSerializedValue() throws
Exception {
EmbeddedChannel channel = new
EmbeddedChannel(getFrameDecoder(), handler);
// Failing KvState
- InternalKvState<Integer, VoidNamespace, Long> kvState =
- new InternalKvState<Integer, VoidNamespace,
Long>() {
- @Override
- public TypeSerializer<Integer>
getKeySerializer() {
- return IntSerializer.INSTANCE;
- }
-
- @Override
- public TypeSerializer<VoidNamespace>
getNamespaceSerializer() {
- return
VoidNamespaceSerializer.INSTANCE;
- }
-
- @Override
- public TypeSerializer<Long>
getValueSerializer() {
- return LongSerializer.INSTANCE;
- }
-
- @Override
- public void
setCurrentNamespace(VoidNamespace namespace) {
- // do nothing
- }
-
- @Override
- public byte[] getSerializedValue(
- final byte[]
serializedKeyAndNamespace,
- final
TypeSerializer<Integer> safeKeySerializer,
- final
TypeSerializer<VoidNamespace> safeNamespaceSerializer,
- final
TypeSerializer<Long> safeValueSerializer) throws Exception {
- throw new
RuntimeException("Expected test Exception");
- }
-
- @Override
- public void clear() {
-
- }
- };
+ InternalKvState<Integer, VoidNamespace, Long> kvState =
getFailingState();
+ ValueStateDescriptor<Long> descriptor = new
ValueStateDescriptor("test", LongSerializer.INSTANCE);
KvStateID kvStateId = registry.registerKvState(
new JobID(),
new JobVertexID(),
new KeyGroupRange(0, 0),
"vanilla",
- kvState);
+ kvState,
+ descriptor);
- KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, new byte[0]);
+ KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, new byte[0],
KvStateSerializer.serializedStateDescriptor(descriptor));
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
// Write the request and wait for the response
@@ -363,6 +333,92 @@ public void clear() {
assertEquals(1L, stats.getNumFailed());
}
+ @Test
+ public void testFailureOnCheckStateType() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new
AtomicKvStateRequestStats();
+
+ MessageSerializer<KvStateInternalRequest, KvStateResponse>
serializer =
+ new MessageSerializer<>(new
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new
KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new
KvStateServerHandler(testServer, registry, serializer, stats);
+ EmbeddedChannel channel = new
EmbeddedChannel(getFrameDecoder(), handler);
+
+ // Failing KvState
+ InternalKvState<Integer, VoidNamespace, Long> kvState =
getFailingState();
+ ValueStateDescriptor<Long> registDescriptor = new
ValueStateDescriptor("regist", LongSerializer.INSTANCE);
+
+ KvStateID kvStateId = registry.registerKvState(
+ new JobID(),
+ new JobVertexID(),
+ new KeyGroupRange(0, 0),
+ "vanilla",
+ kvState,
+ registDescriptor);
+
+ ListStateDescriptor<Long> requestdescriptor = new
ListStateDescriptor("test", LongSerializer.INSTANCE);
+ KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, new byte[0],
KvStateSerializer.serializedStateDescriptor(requestdescriptor));
+ ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
+
+ // Write the request and wait for the response
+ channel.writeInbound(serRequest);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+ RequestFailure response =
MessageSerializer.deserializeRequestFailure(buf);
+
+ assertTrue(response.getCause().getMessage().contains("State
type mismatch"));
+
+ assertEquals(1L, stats.getNumRequests());
+ assertEquals(1L, stats.getNumFailed());
+ }
+
+ @Test
+ public void testFailureOnCheckStateValueType() throws Exception {
+ KvStateRegistry registry = new KvStateRegistry();
+ AtomicKvStateRequestStats stats = new
AtomicKvStateRequestStats();
+
+ MessageSerializer<KvStateInternalRequest, KvStateResponse>
serializer =
+ new MessageSerializer<>(new
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new
KvStateResponse.KvStateResponseDeserializer());
+
+ KvStateServerHandler handler = new
KvStateServerHandler(testServer, registry, serializer, stats);
+ EmbeddedChannel channel = new
EmbeddedChannel(getFrameDecoder(), handler);
+
+ // Failing KvState
+ InternalKvState<Integer, VoidNamespace, Long> kvState =
getFailingState();
+
+ ValueStateDescriptor<Long> registDescriptor = new
ValueStateDescriptor("test", LongSerializer.INSTANCE);
+ KvStateID kvStateId = registry.registerKvState(
+ new JobID(),
+ new JobVertexID(),
+ new KeyGroupRange(0, 0),
+ "vanilla",
+ kvState,
+ registDescriptor);
+
+ ValueStateDescriptor<Long> requestDescriptor = new
ValueStateDescriptor("test", IntSerializer.INSTANCE);
+ KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, new byte[0],
KvStateSerializer.serializedStateDescriptor(requestDescriptor));
+ ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
+
+ // Write the request and wait for the response
+ channel.writeInbound(serRequest);
+
+ ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify the response
+ assertEquals(MessageType.REQUEST_FAILURE,
MessageSerializer.deserializeHeader(buf));
+ RequestFailure response =
MessageSerializer.deserializeRequestFailure(buf);
+
+ assertTrue(response.getCause().getMessage().contains("State
value serializer mismatch"));
+
+ assertEquals(1L, stats.getNumRequests());
+ assertEquals(1L, stats.getNumFailed());
+ }
+
/**
* Tests that the channel is closed if an Exception reaches the channel
handler.
*/
@@ -435,7 +491,7 @@ public void testQueryExecutorShutDown() throws Throwable {
assertTrue(registryListener.registrationName.equals("vanilla"));
- KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId, new byte[0]);
+ KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId, new byte[0], new byte[0]);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
// Write the request and wait for the response
@@ -519,7 +575,7 @@ public void testIncomingBufferIsRecycled() throws Exception
{
KvStateServerHandler handler = new
KvStateServerHandler(testServer, registry, serializer, stats);
EmbeddedChannel channel = new
EmbeddedChannel(getFrameDecoder(), handler);
- KvStateInternalRequest request = new KvStateInternalRequest(new
KvStateID(), new byte[0]);
+ KvStateInternalRequest request = new KvStateInternalRequest(new
KvStateID(), new byte[0], new byte[0]);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 282872L, request);
assertEquals(1L, serRequest.refCnt());
@@ -588,10 +644,12 @@ public void testSerializerMismatch() throws Exception {
IntSerializer.INSTANCE,
"wrong-namespace-type",
StringSerializer.INSTANCE);
+ byte[] serializedStateDescriptor =
KvStateSerializer.serializedStateDescriptor(desc);
assertTrue(registryListener.registrationName.equals("vanilla"));
- KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace);
+ KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId,
+
wrongKeyAndNamespace,
serializedStateDescriptor);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 182828L, request);
// Write the request and wait for the response
@@ -607,7 +665,7 @@ public void testSerializerMismatch() throws Exception {
assertTrue(response.getCause().getMessage().contains("IOException"));
// Repeat with wrong namespace only
- request = new
KvStateInternalRequest(registryListener.kvStateId, wrongNamespace);
+ request = new
KvStateInternalRequest(registryListener.kvStateId, wrongNamespace,
serializedStateDescriptor);
serRequest =
MessageSerializer.serializeRequest(channel.alloc(), 182829L, request);
// Write the request and wait for the response
@@ -676,12 +734,13 @@ public void testChunkedResponse() throws Exception {
IntSerializer.INSTANCE,
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
+ byte[] serializedStateDescriptor =
KvStateSerializer.serializedStateDescriptor(desc);
long requestId = Integer.MAX_VALUE + 182828L;
assertTrue(registryListener.registrationName.equals("vanilla"));
- KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
+ KvStateInternalRequest request = new
KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace,
serializedStateDescriptor);
ByteBuf serRequest =
MessageSerializer.serializeRequest(channel.alloc(), requestId, request);
// Write the request and wait for the response
@@ -765,4 +824,44 @@ public void notifyKvStateUnregistered(JobID jobId,
registry.createTaskRegistry(dummyEnv.getJobID(),
dummyEnv.getJobVertexId()),
TtlTimeProvider.DEFAULT);
}
+
+ private InternalKvState<Integer, VoidNamespace, Long> getFailingState()
{
+ InternalKvState<Integer, VoidNamespace, Long> kvState =
+ new InternalKvState<Integer, VoidNamespace, Long>() {
+ @Override
+ public TypeSerializer<Integer>
getKeySerializer() {
+ return IntSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeSerializer<VoidNamespace>
getNamespaceSerializer() {
+ return VoidNamespaceSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeSerializer<Long>
getValueSerializer() {
+ return LongSerializer.INSTANCE;
+ }
+
+ @Override
+ public void setCurrentNamespace(VoidNamespace
namespace) {
+ // do nothing
+ }
+
+ @Override
+ public byte[] getSerializedValue(
+ final byte[] serializedKeyAndNamespace,
+ final TypeSerializer<Integer>
safeKeySerializer,
+ final TypeSerializer<VoidNamespace>
safeNamespaceSerializer,
+ final TypeSerializer<Long>
safeValueSerializer) throws Exception {
+ throw new RuntimeException("Expected
test Exception");
+ }
+
+ @Override
+ public void clear() {
+
+ }
+ };
+ return kvState;
+ }
}
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 79c23ad2a2d..754aaea2f17 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -145,6 +145,7 @@ public void testSimpleRequest() throws Throwable {
IntSerializer.INSTANCE,
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
+ byte[] serializedStateDescriptor =
KvStateSerializer.serializedStateDescriptor(desc);
// Connect to the server
final BlockingQueue<ByteBuf> responses = new
LinkedBlockingQueue<>();
@@ -167,7 +168,8 @@ public void channelRead(ChannelHandlerContext ctx, Object
msg) throws Exception
final KvStateInternalRequest request = new
KvStateInternalRequest(
registryListener.kvStateId,
- serializedKeyAndNamespace);
+ serializedKeyAndNamespace,
+ serializedStateDescriptor);
ByteBuf serializeRequest =
MessageSerializer.serializeRequest(
channel.alloc(),
diff --git
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
index acaa0671bc0..a36612ff757 100644
---
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
+++
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -64,8 +64,9 @@ public void testRequestSerialization() throws Exception {
long requestId = Integer.MAX_VALUE + 1337L;
KvStateID kvStateId = new KvStateID();
byte[] serializedKeyAndNamespace = randomByteArray(1024);
+ byte[] serializedStateDescriptor = randomByteArray(1024);
- final KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+ final KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace,
serializedStateDescriptor);
final MessageSerializer<KvStateInternalRequest,
KvStateResponse> serializer =
new MessageSerializer<>(new
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new
KvStateResponse.KvStateResponseDeserializer());
@@ -91,8 +92,9 @@ public void
testRequestSerializationWithZeroLengthKeyAndNamespace() throws Excep
long requestId = Integer.MAX_VALUE + 1337L;
KvStateID kvStateId = new KvStateID();
byte[] serializedKeyAndNamespace = new byte[0];
+ byte[] serializedStateDescriptor = new byte[0];
- final KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+ final KvStateInternalRequest request = new
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace,
serializedStateDescriptor);
final MessageSerializer<KvStateInternalRequest,
KvStateResponse> serializer =
new MessageSerializer<>(new
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new
KvStateResponse.KvStateResponseDeserializer());
@@ -115,7 +117,16 @@ public void
testRequestSerializationWithZeroLengthKeyAndNamespace() throws Excep
*/
@Test(expected = NullPointerException.class)
public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace()
throws Exception {
- new KvStateInternalRequest(new KvStateID(), null);
+ new KvStateInternalRequest(new KvStateID(), null, new byte[0]);
+ }
+
+ /**
+ * Tests that we don't try to be smart about <code>null</code>
stateDescriptor.
+ * They should be treated explicityly.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testNullPointerExceptionOnNullSerializedStateDescriptor()
throws Exception {
+ new KvStateInternalRequest(new KvStateID(), new byte[0], null);
}
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
index 0bd132f6e05..e9fdd6633ab 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateEntry.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.Preconditions;
@@ -38,13 +39,15 @@
private final InternalKvState<K, N, V> state;
private final KvStateInfo<K, N, V> stateInfo;
+ private final StateDescriptor<?, ?> stateDescriptor;
private final boolean areSerializersStateless;
private final ConcurrentMap<Thread, KvStateInfo<K, N, V>>
serializerCache;
- public KvStateEntry(final InternalKvState<K, N, V> state) {
+ public KvStateEntry(final InternalKvState<K, N, V> state, final
StateDescriptor<?, ?> stateDescriptor) {
this.state = Preconditions.checkNotNull(state);
+ this.stateDescriptor =
Preconditions.checkNotNull(stateDescriptor);
this.stateInfo = new KvStateInfo<>(
state.getKeySerializer(),
state.getNamespaceSerializer(),
@@ -64,6 +67,10 @@ public KvStateEntry(final InternalKvState<K, N, V> state) {
:
serializerCache.computeIfAbsent(Thread.currentThread(), t ->
stateInfo.duplicate());
}
+ public StateDescriptor getStateDescriptor() {
+ return this.stateDescriptor;
+ }
+
public void clear() {
serializerCache.clear();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 430b06bd7d1..14165f41287 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -81,15 +82,16 @@ public void unregisterListener(JobID jobId) {
* @return Assigned KvStateID
*/
public KvStateID registerKvState(
- JobID jobId,
- JobVertexID jobVertexId,
- KeyGroupRange keyGroupRange,
- String registrationName,
- InternalKvState<?, ?, ?> kvState) {
+ JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName,
+ InternalKvState<?, ?, ?> kvState,
+ StateDescriptor<?, ?> stateDescriptor) {
KvStateID kvStateId = new KvStateID();
- if (registeredKvStates.putIfAbsent(kvStateId, new
KvStateEntry<>(kvState)) == null) {
+ if (registeredKvStates.putIfAbsent(kvStateId, new
KvStateEntry(kvState, stateDescriptor)) == null) {
final KvStateRegistryListener listener =
getKvStateRegistryListener(jobId);
if (listener != null) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index a44a508ecbc..bfa6d724c0e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -58,11 +59,12 @@
* @param registrationName The registration name (not necessarily the
same
* as the KvState name defined in the state
* descriptor used to create the KvState
instance)
- * @param kvState The
+ * @param kvState The internal kv instate
+ * @param stateDescriptor The descriptor of the state
*/
- public void registerKvState(KeyGroupRange keyGroupRange, String
registrationName, InternalKvState<?, ?, ?> kvState) {
- KvStateID kvStateId = registry.registerKvState(jobId,
jobVertexId, keyGroupRange, registrationName, kvState);
- registeredKvStates.add(new KvStateInfo(keyGroupRange,
registrationName, kvStateId));
+ public void registerKvState(KeyGroupRange keyGroupRange, String
registrationName, InternalKvState<?, ?, ?> kvState, StateDescriptor<?, ?>
stateDescriptor) {
+ KvStateID kvStateId = registry.registerKvState(jobId,
jobVertexId, keyGroupRange, registrationName, kvState, stateDescriptor);
+ registeredKvStates.add(new KvStateInfo(keyGroupRange,
registrationName, kvStateId, stateDescriptor));
}
/**
@@ -85,7 +87,7 @@ public void unregisterAll() {
private final KvStateID kvStateId;
- KvStateInfo(KeyGroupRange keyGroupRange, String
registrationName, KvStateID kvStateId) {
+ KvStateInfo(KeyGroupRange keyGroupRange, String
registrationName, KvStateID kvStateId, StateDescriptor stateDescriptor) {
this.keyGroupRange = keyGroupRange;
this.registrationName = registrationName;
this.kvStateId = kvStateId;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 17d24f77472..a8dc3064bd3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -251,7 +251,7 @@ private void publishQueryableStateIfEnabled(
throw new IllegalStateException("State backend
has not been initialized for job.");
}
String name = stateDescriptor.getQueryableStateName();
- kvStateRegistry.registerKvState(keyGroupRange, name,
kvState);
+ kvStateRegistry.registerKvState(keyGroupRange, name,
kvState, stateDescriptor);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
index c1c56bf250b..deef7fe49be 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
@@ -19,10 +19,12 @@
package org.apache.flink.runtime.query;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.queryablestate.KvStateID;
@@ -77,7 +79,8 @@ public void testKvStateEntry() throws InterruptedException {
jobVertexId,
keyGroupRange,
registrationName,
- new DummyKvState()
+ new DummyKvState(),
+ getDummyStateDescriptor()
);
final AtomicReference<Throwable> exceptionHolder = new
AtomicReference<>();
@@ -173,7 +176,8 @@ public void testKvStateRegistryListenerNotification() {
jobVertexId,
keyGroupRange,
registrationName,
- new DummyKvState());
+ new DummyKvState(),
+ getDummyStateDescriptor());
assertThat(registeredNotifications1.poll(), equalTo(jobId1));
assertThat(registeredNotifications2.isEmpty(), is(true));
@@ -186,7 +190,8 @@ public void testKvStateRegistryListenerNotification() {
jobVertexId2,
keyGroupRange2,
registrationName2,
- new DummyKvState());
+ new DummyKvState(),
+ getDummyStateDescriptor());
assertThat(registeredNotifications2.poll(), equalTo(jobId2));
assertThat(registeredNotifications1.isEmpty(), is(true));
@@ -244,7 +249,8 @@ public void testLegacyCodePathPreference() {
jobVertexId,
keyGroupRange,
registrationName,
- new DummyKvState());
+ new DummyKvState(),
+ getDummyStateDescriptor());
assertThat(stateRegistrationNotifications.poll(),
equalTo(jobId));
// another listener should not have received any notifications
@@ -410,4 +416,9 @@ public TypeSerializerConfigSnapshot snapshotConfiguration()
{
return null;
}
}
+
+ private ValueStateDescriptor<String> getDummyStateDescriptor() {
+ ValueStateDescriptor<String> dummyDescriptor = new
ValueStateDescriptor<String>("desriptor", StringSerializer.INSTANCE);
+ return dummyDescriptor;
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Queryable state should check types
> ----------------------------------
>
> Key: FLINK-8058
> URL: https://issues.apache.org/jira/browse/FLINK-8058
> Project: Flink
> Issue Type: Improvement
> Components: Queryable State
> Affects Versions: 1.4.0
> Reporter: Chesnay Schepler
> Assignee: Congxian Qiu
> Priority: Major
> Labels: pull-request-available
>
> The queryable state currently doesn't do any type checks on the client or
> server and generally relies on serializers to catch errors.
> Neither the type of state is checked (ValueState, ListState etc.) nor the
> type of contained values.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)