zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435200114



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier 
receivedBarrier, int channelIndex)
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
                long cancelledId = cancelBarrier.getCheckpointId();
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Checkpoint {} canceled, aborting 
alignment.", taskName, cancelledId);
-               }
-
-               if (currentConsumedCheckpointId >= cancelledId && 
!isCheckpointPending()) {
-                       return;
-               }
+               // tag whether we should abort checkpoint from task thread view
+               boolean shouldAbort1 = false;
 
-               if (isCheckpointPending()) {
+               if (cancelledId > currentConsumedCheckpointId) {
+                       currentConsumedCheckpointId = cancelledId;
+                       shouldAbort1 = true;
+               } else if (cancelledId == currentConsumedCheckpointId && 
isCheckpointPending()) {
                        LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                        "Skipping current checkpoint.",
                                taskName,
                                cancelledId,
                                currentConsumedCheckpointId);
 
                        resetConsumedBarriers();
+                       shouldAbort1 = true;
+               }
+
+               // tag whether we should abort checkpoint from 
threadSafeUnaligner view
+               boolean shouldAbort2 = 
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);

Review comment:
       > If the mailbox action hasn't yet been executed, we could mark this 
checkpoint as aborted here, in this method (task thread), and prevent mail from 
executing (task thread).
   
   Yes, we already did it in this PR. If the cancelled checkpoint id is larger 
than current id, the current id would be updated both in 
`CheckpointBarrierUnaligner` and `ThreadSafeUnaligner`. And when the triggered 
checkpoint prepares to execute from mailbox action, it would be exit directly 
by comparing the triggered id with current id.
   
   > If the mailbox action has already executed (task thread), it could have 
left the currentConsumedCheckpointId field already up to date, so this method 
would already know whether to abort or not from shouldAbort1.
   
   Not alway the case. If the checkpoint 1 triggered by netty thread and then 
executed by task thread, that does not mean the task thread already processed 
the respective triggered barrier in advance, so the 
`currentConsumedCheckpointId` might still out of date when checkpoint 1 was 
executed. 
   
   In the async process of checkpointing, the task thread might continue 
processing from other channels, then it might process cancellation checkpoint 
to abort the current executing checkpoint.
   
   > Or is it about aborting the checkpoint after notifyBarrierReceived from 
netty thread, but before enqueued mailbox action was executed?
   
   Yes, it can cover this case as well. All in all, it can abort the 
un-executed checkpoint which was enqueued into mailbox already and also abort 
the going checkpoint which might be in async process.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to