Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5578#discussion_r170953600
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
---
@@ -126,30 +133,54 @@ public void storeLocalState(
LOG.info("Storing local state for checkpoint {}.",
checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId,
localState);
- Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
+ Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the
state.
- toDiscard.put(checkpointId, localState);
+ toDiscard = new AbstractMap.SimpleEntry<Long,
TaskStateSnapshot>(checkpointId, localState);
} else {
TaskStateSnapshot previous =
storedTaskStateByCheckpointID.put(checkpointId, localState);
if (previous != null) {
- toDiscard.put(checkpointId, previous);
+ toDiscard = new
AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous);
}
}
}
- asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+ if (toDiscard != null) {
+
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
+ }
}
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {
synchronized (lock) {
TaskStateSnapshot snapshot =
storedTaskStateByCheckpointID.get(checkpointID);
+
+ Iterator<Map.Entry<Long, TaskStateSnapshot>>
entryIterator =
+
storedTaskStateByCheckpointID.entrySet().iterator();
+
+ if (retrieveWithDiscard) {
+ // Only the TaskStateSnapshot.checkpointID ==
checkpointID is useful, we remove the others
+ final List<Map.Entry<Long, TaskStateSnapshot>>
toRemove = new ArrayList<>();
--- End diff --
I think we can de-duplicate code by extracting a method with the code from
`confirmCheckpoint(...)`, maybe called `pruneCheckpoints(...)`. We can do the
comparison for both use-cases as `entryCheckpointId != checkpointID` and have a
boolean parameter which determines if we break the iteration in the `else` case
or not.
---