Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5578#discussion_r170965755
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
---
@@ -126,30 +133,54 @@ public void storeLocalState(
LOG.info("Storing local state for checkpoint {}.",
checkpointId);
LOG.debug("Local state for checkpoint {} is {}.", checkpointId,
localState);
- Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
+ Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
synchronized (lock) {
if (disposed) {
// we ignore late stores and simply discard the
state.
- toDiscard.put(checkpointId, localState);
+ toDiscard = new AbstractMap.SimpleEntry<Long,
TaskStateSnapshot>(checkpointId, localState);
} else {
TaskStateSnapshot previous =
storedTaskStateByCheckpointID.put(checkpointId, localState);
if (previous != null) {
- toDiscard.put(checkpointId, previous);
+ toDiscard = new
AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous);
}
}
}
- asyncDiscardLocalStateForCollection(toDiscard.entrySet());
+ if (toDiscard != null) {
+
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
+ }
}
@Override
@Nullable
public TaskStateSnapshot retrieveLocalState(long checkpointID) {
synchronized (lock) {
TaskStateSnapshot snapshot =
storedTaskStateByCheckpointID.get(checkpointID);
+
+ Iterator<Map.Entry<Long, TaskStateSnapshot>>
entryIterator =
+
storedTaskStateByCheckpointID.entrySet().iterator();
+
+ if (retrieveWithDiscard) {
+ // Only the TaskStateSnapshot.checkpointID ==
checkpointID is useful, we remove the others
+ final List<Map.Entry<Long, TaskStateSnapshot>>
toRemove = new ArrayList<>();
--- End diff --
ð addressed.
---