[
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398264#comment-16398264
]
ASF GitHub Bot commented on FLINK-8802:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5691#discussion_r174385024
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java
---
@@ -40,6 +54,81 @@
*/
public class KvStateRegistryTest extends TestLogger {
+ @Test
+ public void testKvStateEntry() throws InterruptedException {
+ final int threads = 10;
+
+ final CountDownLatch latch1 = new CountDownLatch(threads);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ final List<KvStateInfo<?, ?, ?>> infos =
Collections.synchronizedList(new ArrayList<>());
+
+ final JobID jobID = new JobID();
+
+ final JobVertexID jobVertexId = new JobVertexID();
+ final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
+ final String registrationName = "foobar";
+
+ final KvStateRegistry kvStateRegistry = new KvStateRegistry();
+ final KvStateID stateID = kvStateRegistry.registerKvState(
+ jobID,
+ jobVertexId,
+ keyGroupRange,
+ registrationName,
+ new DummyKvState()
+ );
+
+ for (int i = 0; i < threads; i++) {
+ new Thread(() -> {
+ final KvStateEntry<?, ?, ?> kvState =
kvStateRegistry.getKvState(stateID);
+ final KvStateInfo<?, ?, ?> stateInfo =
kvState.getInfoForCurrentThread();
+ infos.add(stateInfo);
+
+ latch1.countDown();
+ try {
+ latch2.await();
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ }).start();
+ }
+
+ latch1.await();
+
+ final KvStateEntry<?, ?, ?> kvState =
kvStateRegistry.getKvState(stateID);
+
+ // verify that all the threads are done correctly.
+ Assert.assertEquals(threads, infos.size());
+ Assert.assertEquals(threads, kvState.getCacheSize());
+
+ latch2.countDown();
+
+ for (KvStateInfo<?, ?, ?> infoA: infos) {
+ boolean found = false;
+ for (KvStateInfo<?, ?, ?> infoB: infos) {
+ if (infoA == infoB) {
+ if (found) {
+ Assert.fail("Already found");
--- End diff --
needs a better error message
> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers
> are not duplicated, which may lead to exceptions thrown when a serializer is
> stateful.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)