Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r168514168
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
---
@@ -166,20 +190,38 @@ public TaskStateSnapshot retrieveLocalState(long
checkpointID) {
* Disposes the state of all local snapshots managed by this object.
*/
public void dispose() {
+
+ Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy;
+
synchronized (lock) {
- for (Map.Entry<Long, TaskStateSnapshot> entry :
storedTaskStateByCheckpointID.entrySet()) {
- discardStateObject(entry.getValue(),
entry.getKey());
- }
discarded = true;
+ statesCopy = new
ArrayList<>(storedTaskStateByCheckpointID.entrySet());
}
+
+ discardExecutor.execute(() -> {
--- End diff --
I think it would be good for these kind of asynchronous clean up operations
to return a `CompletableFuture`. This future could then be returned by the
`dispose` method. The benefit would be that the caller would know when the
clean up has completed and, thus, would be safe to shut down the
`discardExecutor`.
---