zhijiangW commented on a change in pull request #12478:
URL: https://github.com/apache/flink/pull/12478#discussion_r437939827



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -221,9 +232,14 @@ public void checkpointState(
                // We generally try to emit the checkpoint barrier as soon as 
possible to not affect downstream
                // checkpoint alignments
 
+               if (lastCheckpointId >= metadata.getCheckpointId()) {

Review comment:
       In addition, it is a bit tough to understand the semantic of different 
cases now. The previous `lastCheckpointId` only reflects the already happened 
checkpoint, so wee can rely on `lastCheckpointId >= metadata.getCheckpointId()` 
to discard all the following smaller checkpoints based on the checkpoint id 
incremental guarantee.
   
   Now the `lastCheckpointId` also reflects the canceled id from 
`CheckpointBarrierHandler`, so if the handler notifies to abort checkpoint 10, 
should we also need to ignore checkpoint 9. If so, it is also different with 
the abort case from RPC. If RPC notifies to abort checkpoint 10, the checkpoint 
9 can still execute afterwards since we store the aborted id from RPC in the 
`abortedCheckpointIds` set.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -221,9 +232,14 @@ public void checkpointState(
                // We generally try to emit the checkpoint barrier as soon as 
possible to not affect downstream
                // checkpoint alignments
 
+               if (lastCheckpointId >= metadata.getCheckpointId()) {

Review comment:
       In addition, it is a bit tough to understand the semantic of different 
cases now. The previous `lastCheckpointId` only reflects the already happened 
checkpoint, so we can rely on `lastCheckpointId >= metadata.getCheckpointId()` 
to discard all the following smaller checkpoints based on the checkpoint id 
incremental guarantee.
   
   Now the `lastCheckpointId` also reflects the canceled id from 
`CheckpointBarrierHandler`, so if the handler notifies to abort checkpoint 10, 
should we also need to ignore checkpoint 9. If so, it is also different with 
the abort case from RPC. If RPC notifies to abort checkpoint 10, the checkpoint 
9 can still execute afterwards since we store the aborted id from RPC in the 
`abortedCheckpointIds` set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to