Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5578#discussion_r170953600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java --- @@ -126,30 +133,54 @@ public void storeLocalState( LOG.info("Storing local state for checkpoint {}.", checkpointId); LOG.debug("Local state for checkpoint {} is {}.", checkpointId, localState); - Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16); + Map.Entry<Long, TaskStateSnapshot> toDiscard = null; synchronized (lock) { if (disposed) { // we ignore late stores and simply discard the state. - toDiscard.put(checkpointId, localState); + toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, localState); } else { TaskStateSnapshot previous = storedTaskStateByCheckpointID.put(checkpointId, localState); if (previous != null) { - toDiscard.put(checkpointId, previous); + toDiscard = new AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous); } } } - asyncDiscardLocalStateForCollection(toDiscard.entrySet()); + if (toDiscard != null) { + asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard)); + } } @Override @Nullable public TaskStateSnapshot retrieveLocalState(long checkpointID) { synchronized (lock) { TaskStateSnapshot snapshot = storedTaskStateByCheckpointID.get(checkpointID); + + Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator = + storedTaskStateByCheckpointID.entrySet().iterator(); + + if (retrieveWithDiscard) { + // Only the TaskStateSnapshot.checkpointID == checkpointID is useful, we remove the others + final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new ArrayList<>(); --- End diff -- I think we can de-duplicate code by extracting a method with the code from `confirmCheckpoint(...)`, maybe called `pruneCheckpoints(...)`. We can do the comparison for both use-cases as `entryCheckpointId != checkpointID` and have a boolean parameter which determines if we break the iteration in the `else` case or not.
---