pnowojski commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363220654
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##########
 @@ -957,17 +953,19 @@ private void rememberRecentCheckpointId(long id) {
        }
 
        private void dropSubsumedCheckpoints(long checkpointId) {
-               Iterator<Map.Entry<Long, PendingCheckpoint>> entries = 
pendingCheckpoints.entrySet().iterator();
-
-               while (entries.hasNext()) {
-                       PendingCheckpoint p = entries.next().getValue();
-                       // remove all pending checkpoints that are lesser than 
the current completed checkpoint
-                       if (p.getCheckpointId() < checkpointId && 
p.canBeSubsumed()) {
-                               rememberRecentCheckpointId(p.getCheckpointId());
-                               failPendingCheckpoint(p, 
CheckpointFailureReason.CHECKPOINT_SUBSUMED);
-                               entries.remove();
-                       }
-               }
+               PendingCheckpoint[] checkpointsToSubsume =
+                       pendingCheckpoints
+                               .values()
+                               .stream()
+                               .filter(
+                                       pendingCheckpoint ->
+                                               
pendingCheckpoint.getCheckpointId() < checkpointId &&
+                                               
pendingCheckpoint.canBeSubsumed())
+                               .toArray(PendingCheckpoint[]::new);
+
+               abortPendingCheckpoints(
+                       checkpointsToSubsume,
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED));
 
 Review comment:
   Yes, but wouldn't this problem be avoided by inlining 
`abortPendingCheckpoints(PendingCheckpoint[] checkpoints, CheckpointException 
exception)` and sticking to `iterator()` and `iterator#remove()`, as it was 
before? True, it would force you to provide 
`abortPendingCheckpoint(Iterator<PendingCheckpoint> , ...)` version.
   
   Alternatively, you could also provide 
`abortPendingCheckpoints(Predicate<PendingCheckpint> 
checkpointToDeletePredicate, ...)`, that hides/deduplicates this array copying 
logic, which is probably a better way.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to