Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168718667
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
@@ -166,20 +190,38 @@ public TaskStateSnapshot retrieveLocalState(long
checkpointID) {
* Disposes the state of all local snapshots managed by this object.
*/
public void dispose() {
+
+ Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy;
+
synchronized (lock) {
- for (Map.Entry<Long, TaskStateSnapshot> entry :
storedTaskStateByCheckpointID.entrySet()) {
- discardStateObject(entry.getValue(),
entry.getKey());
- }
discarded = true;
+ statesCopy = new
ArrayList<>(storedTaskStateByCheckpointID.entrySet());
}
+
+ discardExecutor.execute(() -> {
--- End diff --
@tillrohrmann `ExecutorService` has a shutdown method (the one we call)
that waits until all submitted futures are completed. Do you see any additional
value in tracking the future completion individually?
---