Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r177690430
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 ---
    @@ -124,10 +134,56 @@ public void testListSerialization() throws Exception {
                        .createListState(VoidNamespaceSerializer.INSTANCE,
                                new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
     
    -           KvStateRequestSerializerTest.testListSerialization(key, 
listState);
    +           testListSerialization(key, (RocksDBListState<Long, 
VoidNamespace, Long>) listState);
                longHeapKeyedStateBackend.dispose();
        }
     
    +   /**
    +    * Verifies that the serialization of a list using the given list state
    +    * matches the deserialization with {@link 
KvStateSerializer#deserializeList}.
    +    *
    +    * @param key
    +    *              key of the list state
    +    * @param listState
    +    *              list state using the {@link VoidNamespace}, must also 
be a {@link RocksDBListState} instance
    +    *
    +    * @throws Exception
    +    */
    +   private void testListSerialization(
    +                   final long key,
    +                   final RocksDBListState<Long, VoidNamespace, Long> 
listState) throws Exception {
    +
    +           TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
    +           listState.setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +           // List
    +           final int numElements = 10;
    +
    +           final List<Long> expectedValues = new ArrayList<>();
    +           for (int i = 0; i < numElements; i++) {
    +                   final long value = 
ThreadLocalRandom.current().nextLong();
    +                   expectedValues.add(value);
    +                   listState.add(value);
    +           }
    +
    +           final byte[] serializedKey =
    +                           KvStateSerializer.serializeKeyAndNamespace(
    +                                           key, LongSerializer.INSTANCE,
    +                                           VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE);
    +
    +           final byte[] serializedValues = 
listState.getSerializedValue(serializedKey);
    +
    +           List<Long> actualValues = 
KvStateSerializer.deserializeList(serializedValues, valueSerializer);
    +           assertEquals(expectedValues, actualValues);
    +
    +           // Single value
    +           long expectedValue = ThreadLocalRandom.current().nextLong();
    --- End diff --
    
    nit: why is this using `ThreadLocalRandom`?


---

Reply via email to