Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r177690430
--- Diff:
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
---
@@ -124,10 +134,56 @@ public void testListSerialization() throws Exception {
.createListState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test",
LongSerializer.INSTANCE));
- KvStateRequestSerializerTest.testListSerialization(key,
listState);
+ testListSerialization(key, (RocksDBListState<Long,
VoidNamespace, Long>) listState);
longHeapKeyedStateBackend.dispose();
}
+ /**
+ * Verifies that the serialization of a list using the given list state
+ * matches the deserialization with {@link
KvStateSerializer#deserializeList}.
+ *
+ * @param key
+ * key of the list state
+ * @param listState
+ * list state using the {@link VoidNamespace}, must also
be a {@link RocksDBListState} instance
+ *
+ * @throws Exception
+ */
+ private void testListSerialization(
+ final long key,
+ final RocksDBListState<Long, VoidNamespace, Long>
listState) throws Exception {
+
+ TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
+ listState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ // List
+ final int numElements = 10;
+
+ final List<Long> expectedValues = new ArrayList<>();
+ for (int i = 0; i < numElements; i++) {
+ final long value =
ThreadLocalRandom.current().nextLong();
+ expectedValues.add(value);
+ listState.add(value);
+ }
+
+ final byte[] serializedKey =
+ KvStateSerializer.serializeKeyAndNamespace(
+ key, LongSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
+
+ final byte[] serializedValues =
listState.getSerializedValue(serializedKey);
+
+ List<Long> actualValues =
KvStateSerializer.deserializeList(serializedValues, valueSerializer);
+ assertEquals(expectedValues, actualValues);
+
+ // Single value
+ long expectedValue = ThreadLocalRandom.current().nextLong();
--- End diff --
nit: why is this using `ThreadLocalRandom`?
---