zhijiangW commented on a change in pull request #12478:
URL: https://github.com/apache/flink/pull/12478#discussion_r437939827
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -221,9 +232,14 @@ public void checkpointState(
// We generally try to emit the checkpoint barrier as soon as
possible to not affect downstream
// checkpoint alignments
+ if (lastCheckpointId >= metadata.getCheckpointId()) {
Review comment:
In addition, it is a bit tough to understand the semantic of different
cases now. The previous `lastCheckpointId` only reflects the already happened
checkpoint, so wee can rely on `lastCheckpointId >= metadata.getCheckpointId()`
to discard all the following smaller checkpoints based on the checkpoint id
incremental guarantee.
Now the `lastCheckpointId` also reflects the canceled id from
`CheckpointBarrierHandler`, so if the handler notifies to abort checkpoint 10,
should we also need to ignore checkpoint 9. If so, it is also different with
the abort case from RPC. If RPC notifies to abort checkpoint 10, the checkpoint
9 can still execute afterwards since we store the aborted id from RPC in the
`abortedCheckpointIds` set.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -221,9 +232,14 @@ public void checkpointState(
// We generally try to emit the checkpoint barrier as soon as
possible to not affect downstream
// checkpoint alignments
+ if (lastCheckpointId >= metadata.getCheckpointId()) {
Review comment:
In addition, it is a bit tough to understand the semantic of different
cases now. The previous `lastCheckpointId` only reflects the already happened
checkpoint, so we can rely on `lastCheckpointId >= metadata.getCheckpointId()`
to discard all the following smaller checkpoints based on the checkpoint id
incremental guarantee.
Now the `lastCheckpointId` also reflects the canceled id from
`CheckpointBarrierHandler`, so if the handler notifies to abort checkpoint 10,
should we also need to ignore checkpoint 9. If so, it is also different with
the abort case from RPC. If RPC notifies to abort checkpoint 10, the checkpoint
9 can still execute afterwards since we store the aborted id from RPC in the
`abortedCheckpointIds` set.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]