Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5934#discussion_r185485401
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
    @@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws 
Exception {
                }
        }
     
    +   @Test
    +   public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() 
throws Exception {
    +
    +           CheckpointStreamFactory streamFactory = createStreamFactory();
    +           Environment env = new DummyEnvironment();
    +           AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
    +
    +           ExecutorService executorService = 
Executors.newScheduledThreadPool(1);
    +           try {
    +                   long checkpointID = 0;
    +                   List<Future> futureList = new ArrayList();
    +                   for (int i = 0; i < 10; ++i) {
    +                           ValueStateDescriptor<Integer> kvId = new 
ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
    +                           ValueState<Integer> state = 
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
    +                           ((InternalValueState) 
state).setCurrentNamespace(VoidNamespace.INSTANCE);
    +                           backend.setCurrentKey(i);
    +                           state.update(i);
    +
    +                           futureList.add(runSnapshotAsync(executorService,
    +                                   backend.snapshot(checkpointID++, 
System.currentTimeMillis(), streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation())));
    +                   }
    +
    +                   for (Future future : futureList) {
    +                           future.get(10, TimeUnit.SECONDS);
    +                   }
    +           } catch (Exception e) {
    +                   fail();
    +           } finally {
    +                   backend.dispose();
    +                   executorService.shutdown();
    +           }
    +   }
    +
    +   protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
    +           ExecutorService executorService,
    +           RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotRunnableFuture) throws Exception {
    +
    +           if (!snapshotRunnableFuture.isDone()) {
    --- End diff --
    
    👍 


---

Reply via email to