zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r434686880



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier 
receivedBarrier, int channelIndex)
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
                long cancelledId = cancelBarrier.getCheckpointId();
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Checkpoint {} canceled, aborting 
alignment.", taskName, cancelledId);
-               }
-
-               if (currentConsumedCheckpointId >= cancelledId && 
!isCheckpointPending()) {
-                       return;
-               }
+               // tag whether we should abort checkpoint from task thread view
+               boolean shouldAbort1 = false;
 
-               if (isCheckpointPending()) {
+               if (cancelledId > currentConsumedCheckpointId) {
+                       currentConsumedCheckpointId = cancelledId;
+                       shouldAbort1 = true;
+               } else if (cancelledId == currentConsumedCheckpointId && 
isCheckpointPending()) {
                        LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                        "Skipping current checkpoint.",
                                taskName,
                                cancelledId,
                                currentConsumedCheckpointId);
 
                        resetConsumedBarriers();
+                       shouldAbort1 = true;
+               }
+
+               // tag whether we should abort checkpoint from 
threadSafeUnaligner view
+               boolean shouldAbort2 = 
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);

Review comment:
       The checkpoint might happen either by netty thread via 
`notifyBarrierReceived` or by task thread via `processBarrier`, and we are not 
sure which one would happen earlier in the race condition, so we need to 
compare both `currentReceivedCheckpointId` and `currentConsumedCheckpointId` 
with canceled id to abort checkpoint properly.
   
   If `shouldAbort2` true and `shouldAbort1` false, that means the 
`notifyBarrierReceived` triggered by netty thread happen earlier. E.g. the 
netty thread receives the ch2 from channel1, and the task thread processes the 
ch1 from channel2, then the task thread processes the cancellation ch2 from 
channel3. In this case, `shouldAbort1` will be false and `shouldAbort2` will be 
true.
   
   No matter which tag is true, we should abort the checkpoint. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to