Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5934#discussion_r185508430
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
@@ -3594,6 +3599,58 @@ public String fold(String acc, Integer value) throws
Exception {
}
}
+ @Test
+ public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync()
throws Exception {
+
+ CheckpointStreamFactory streamFactory = createStreamFactory();
+ Environment env = new DummyEnvironment();
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+ ExecutorService executorService =
Executors.newScheduledThreadPool(1);
+ try {
+ long checkpointID = 0;
+ List<Future> futureList = new ArrayList();
+ for (int i = 0; i < 10; ++i) {
+ ValueStateDescriptor<Integer> kvId = new
ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE);
+ ValueState<Integer> state =
backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+ ((InternalValueState)
state).setCurrentNamespace(VoidNamespace.INSTANCE);
+ backend.setCurrentKey(i);
+ state.update(i);
+
+ futureList.add(runSnapshotAsync(executorService,
+ backend.snapshot(checkpointID++,
System.currentTimeMillis(), streamFactory,
CheckpointOptions.forCheckpointWithDefaultLocation())));
+ }
+
+ for (Future future : futureList) {
+ future.get(10, TimeUnit.SECONDS);
+ }
+ } catch (Exception e) {
+ fail();
+ } finally {
+ backend.dispose();
+ executorService.shutdown();
+ }
+ }
+
+ protected Future<SnapshotResult<KeyedStateHandle>> runSnapshotAsync(
+ ExecutorService executorService,
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
snapshotRunnableFuture) throws Exception {
+
+ if (!snapshotRunnableFuture.isDone()) {
--- End diff --
Sorry, my bad, I overlooked that you are using the return value. I will
revert this to your first approach before merging because this does not really
improve it.
---