[
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16419114#comment-16419114
]
ASF GitHub Bot commented on FLINK-8802:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r177690430
--- Diff:
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
---
@@ -124,10 +134,56 @@ public void testListSerialization() throws Exception {
.createListState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test",
LongSerializer.INSTANCE));
- KvStateRequestSerializerTest.testListSerialization(key,
listState);
+ testListSerialization(key, (RocksDBListState<Long,
VoidNamespace, Long>) listState);
longHeapKeyedStateBackend.dispose();
}
+ /**
+ * Verifies that the serialization of a list using the given list state
+ * matches the deserialization with {@link
KvStateSerializer#deserializeList}.
+ *
+ * @param key
+ * key of the list state
+ * @param listState
+ * list state using the {@link VoidNamespace}, must also
be a {@link RocksDBListState} instance
+ *
+ * @throws Exception
+ */
+ private void testListSerialization(
+ final long key,
+ final RocksDBListState<Long, VoidNamespace, Long>
listState) throws Exception {
+
+ TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+ listState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ // List
+ final int numElements = 10;
+
+ final List<Long> expectedValues = new ArrayList<>();
+ for (int i = 0; i < numElements; i++) {
+ final long value =
ThreadLocalRandom.current().nextLong();
+ expectedValues.add(value);
+ listState.add(value);
+ }
+
+ final byte[] serializedKey =
+ KvStateSerializer.serializeKeyAndNamespace(
+ key, LongSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
+
+ final byte[] serializedValues =
listState.getSerializedValue(serializedKey);
+
+ List<Long> actualValues =
KvStateSerializer.deserializeList(serializedValues, valueSerializer);
+ assertEquals(expectedValues, actualValues);
+
+ // Single value
+ long expectedValue = ThreadLocalRandom.current().nextLong();
--- End diff --
nit: why is this using `ThreadLocalRandom`?
> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers
> are not duplicated, which may lead to exceptions thrown when a serializer is
> stateful.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)