[ 
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874568#comment-15874568
 ] 

ASF GitHub Bot commented on FLINK-4856:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3336#discussion_r101995663
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 ---
    @@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws 
Exception {
                KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2, 3},
                        LongSerializer.INSTANCE);
        }
    +   
    +   /**
    +    * Tests map serialization utils.
    +    */
    +   @Test
    +   public void testMapSerialization() throws Exception {
    +           final long key = 0L;
    +
    +           // objects for heap state list serialisation
    +           final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
    +                   new HeapKeyedStateBackend<>(
    +                           mock(TaskKvStateRegistry.class),
    +                           LongSerializer.INSTANCE,
    +                           ClassLoader.getSystemClassLoader(),
    +                           1, new KeyGroupRange(0, 0)
    +                   );
    +           longHeapKeyedStateBackend.setCurrentKey(key);
    +
    +           final InternalMapState<VoidNamespace, Long, String> mapState = 
longHeapKeyedStateBackend.createMapState(
    +                           VoidNamespaceSerializer.INSTANCE,
    +                           new MapStateDescriptor<>("test", 
LongSerializer.INSTANCE, StringSerializer.INSTANCE));
    +
    +           testMapSerialization(key, mapState);
    +   }
    +
    +   /**
    +    * Verifies that the serialization of a map using the given map state
    +    * matches the deserialization with {@link 
KvStateRequestSerializer#deserializeList}.
    +    *
    +    * @param key
    +    *              key of the map state
    +    * @param mapState
    +    *              map state using the {@link VoidNamespace}, must also be 
a {@link InternalKvState} instance
    +    *
    +    * @throws Exception
    +    */
    +   public static void testMapSerialization(
    +                   final long key,
    +                   final InternalMapState<VoidNamespace, Long, String> 
mapState) throws Exception {
    +
    +           TypeSerializer<Long> userKeySerializer = 
LongSerializer.INSTANCE;
    +           TypeSerializer<String> userValueSerializer = 
StringSerializer.INSTANCE;
    +           mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +           // List
    +           final int numElements = 10;
    +
    +           final Map<Long, String> expectedValues = new HashMap<>();
    +           for (int i = 0; i < numElements; i++) {
    +                   final long value = 
ThreadLocalRandom.current().nextLong();
    --- End diff --
    
    Although it probably doesn't matter too much here, in general I would 
suggest to use random generators with a seed, so that in case a test fails, it 
is easier to reproduce the failing case.


> Add MapState for keyed streams
> ------------------------------
>
>                 Key: FLINK-4856
>                 URL: https://issues.apache.org/jira/browse/FLINK-4856
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API, State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently, 
> these states are implemented by storing the entire map into a ValueState or a 
> ListState. The implementation however is very costly because all entries have 
> to be serialized/deserialized when updating a single entry. To improve the 
> efficiency of these states, MapStates are urgently needed. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to