Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5578#discussion_r170953600
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 ---
    @@ -126,30 +133,54 @@ public void storeLocalState(
                LOG.info("Storing local state for checkpoint {}.", 
checkpointId);
                LOG.debug("Local state for checkpoint {} is {}.", checkpointId, 
localState);
     
    -           Map<Long, TaskStateSnapshot> toDiscard = new HashMap<>(16);
    +           Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
     
                synchronized (lock) {
                        if (disposed) {
                                // we ignore late stores and simply discard the 
state.
    -                           toDiscard.put(checkpointId, localState);
    +                           toDiscard = new AbstractMap.SimpleEntry<Long, 
TaskStateSnapshot>(checkpointId, localState);
                        } else {
                                TaskStateSnapshot previous =
                                        
storedTaskStateByCheckpointID.put(checkpointId, localState);
     
                                if (previous != null) {
    -                                   toDiscard.put(checkpointId, previous);
    +                                   toDiscard = new 
AbstractMap.SimpleEntry<Long, TaskStateSnapshot>(checkpointId, previous);
                                }
                        }
                }
     
    -           asyncDiscardLocalStateForCollection(toDiscard.entrySet());
    +           if (toDiscard != null) {
    +                   
asyncDiscardLocalStateForCollection(Collections.singletonList(toDiscard));
    +           }
        }
     
        @Override
        @Nullable
        public TaskStateSnapshot retrieveLocalState(long checkpointID) {
                synchronized (lock) {
                        TaskStateSnapshot snapshot = 
storedTaskStateByCheckpointID.get(checkpointID);
    +
    +                   Iterator<Map.Entry<Long, TaskStateSnapshot>> 
entryIterator =
    +                           
storedTaskStateByCheckpointID.entrySet().iterator();
    +
    +                   if (retrieveWithDiscard) {
    +                           // Only the TaskStateSnapshot.checkpointID == 
checkpointID is useful, we remove the others
    +                           final List<Map.Entry<Long, TaskStateSnapshot>> 
toRemove = new ArrayList<>();
    --- End diff --
    
    I think we can de-duplicate code by extracting a method with the code from 
`confirmCheckpoint(...)`, maybe called `pruneCheckpoints(...)`. We can do the 
comparison for both use-cases as `entryCheckpointId != checkpointID` and have a 
boolean parameter which determines if we break the iteration in the `else` case 
or not.


---

Reply via email to