[
https://issues.apache.org/jira/browse/FLINK-4856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874568#comment-15874568
]
ASF GitHub Bot commented on FLINK-4856:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3336#discussion_r101995663
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
---
@@ -410,6 +415,124 @@ public void testDeserializeListTooShort2() throws
Exception {
KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1,
1, 1, 1, 1, 1, 2, 3},
LongSerializer.INSTANCE);
}
+
+ /**
+ * Tests map serialization utils.
+ */
+ @Test
+ public void testMapSerialization() throws Exception {
+ final long key = 0L;
+
+ // objects for heap state list serialisation
+ final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
+ new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ ClassLoader.getSystemClassLoader(),
+ 1, new KeyGroupRange(0, 0)
+ );
+ longHeapKeyedStateBackend.setCurrentKey(key);
+
+ final InternalMapState<VoidNamespace, Long, String> mapState =
longHeapKeyedStateBackend.createMapState(
+ VoidNamespaceSerializer.INSTANCE,
+ new MapStateDescriptor<>("test",
LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+ testMapSerialization(key, mapState);
+ }
+
+ /**
+ * Verifies that the serialization of a map using the given map state
+ * matches the deserialization with {@link
KvStateRequestSerializer#deserializeList}.
+ *
+ * @param key
+ * key of the map state
+ * @param mapState
+ * map state using the {@link VoidNamespace}, must also be
a {@link InternalKvState} instance
+ *
+ * @throws Exception
+ */
+ public static void testMapSerialization(
+ final long key,
+ final InternalMapState<VoidNamespace, Long, String>
mapState) throws Exception {
+
+ TypeSerializer<Long> userKeySerializer =
LongSerializer.INSTANCE;
+ TypeSerializer<String> userValueSerializer =
StringSerializer.INSTANCE;
+ mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ // List
+ final int numElements = 10;
+
+ final Map<Long, String> expectedValues = new HashMap<>();
+ for (int i = 0; i < numElements; i++) {
+ final long value =
ThreadLocalRandom.current().nextLong();
--- End diff --
Although it probably doesn't matter too much here, in general I would
suggest to use random generators with a seed, so that in case a test fails, it
is easier to reproduce the failing case.
> Add MapState for keyed streams
> ------------------------------
>
> Key: FLINK-4856
> URL: https://issues.apache.org/jira/browse/FLINK-4856
> Project: Flink
> Issue Type: New Feature
> Components: DataStream API, State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Many states in keyed streams are organized as key-value pairs. Currently,
> these states are implemented by storing the entire map into a ValueState or a
> ListState. The implementation however is very costly because all entries have
> to be serialized/deserialized when updating a single entry. To improve the
> efficiency of these states, MapStates are urgently needed.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)