Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r170270552 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java --- @@ -166,20 +190,38 @@ public TaskStateSnapshot retrieveLocalState(long checkpointID) { * Disposes the state of all local snapshots managed by this object. */ public void dispose() { + + Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy; + synchronized (lock) { - for (Map.Entry<Long, TaskStateSnapshot> entry : storedTaskStateByCheckpointID.entrySet()) { - discardStateObject(entry.getValue(), entry.getKey()); - } discarded = true; + statesCopy = new ArrayList<>(storedTaskStateByCheckpointID.entrySet()); } + + discardExecutor.execute(() -> { --- End diff -- ð
---