pnowojski commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r434542740



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -409,5 +416,15 @@ synchronized long getCurrentCheckpointId() {
                boolean isCheckpointPending() {
                        return numBarriersReceived > 0;
                }
+
+               private void resetReceivedBarriers() {
+                       Arrays.fill(storeNewBuffers, false);
+                       numBarriersReceived = 0;
+               }
+
+               private void notifyAbort(CheckpointException exception) throws 
IOException {
+                       long currentCheckpointId = currentReceivedCheckpointId;
+                       handler.executeInTaskThread(() -> 
handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
+               }

Review comment:
       I'm not sure if this is working properly and even if it is, I'm not sure 
if it's a good way to solve the problem.
   
   As far as I understand it works like this:
   1. we trigger checkpoint from netty thread and enqueue `notifyCheckpoint` in 
the mailbox
   2. we receive channel closed/end of partition event before 
`notifyCheckpoint` starts executing in the task thread from mailbox. This will 
lead us to this method and enqueuing `notifyAbort` after `notifyCheckpoint` in 
the mailbox
   3. we are still going to execute `notifyCheckpoint` callback and trigger the 
checkpoint, despite it was already (partially?) aborted (for example 
`resetReceivedBarriers()` has already been called.
   4.  `notifyAbort` will clean up the checkpoint started in 3.
   
   Besides being quite complicated and hard to reason about, I'm not sure if 
it's correct and what could be the side effects of doing this in so many 
stages. It also might be unnecessarily using resources for starting a 
checkpoint that we already know will not happen.
   
   Why can not it work like that:
   1. as before
   2. we receive channel closed/end of partition event before 
`notifyCheckpoint` starts executing in the task thread from mailbox. We mark 
the correct checkpoints as cancelled (by bumping the cancelled/current 
checkpoint ids) in the `ThreadSafeUnaligner` and we abort the checkpoint 
immediately (assuming we are in the task thead, but I think we are always).
   3. if `notifyCheckpoint` starts executing it should check if the checkpoint 
it's suppose to notify wasn't cancelled. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier 
receivedBarrier, int channelIndex)
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
                long cancelledId = cancelBarrier.getCheckpointId();
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Checkpoint {} canceled, aborting 
alignment.", taskName, cancelledId);
-               }

Review comment:
       Have we lost this log message?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -117,17 +117,6 @@
                threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, 
checkNotNull(channelStateWriter), this);
        }
 
-       @Override
-       public void releaseBlocksAndResetBarriers() {
-               if (isCheckpointPending()) {
-                       // make sure no additional data is persisted
-                       Arrays.fill(hasInflightBuffers, false);
-                       // the next barrier that comes must assume it is the 
first
-                       numBarrierConsumed = 0;
-               }
-               
threadSafeUnaligner.resetReceivedBarriers(currentConsumedCheckpointId);
-       }
-

Review comment:
       Are you sure this is a safe delete? 
`AlternatingCheckpointBarrierHandler` is for example using this method to clean 
up state of the previous handler before switching to another.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier 
receivedBarrier, int channelIndex)
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
                long cancelledId = cancelBarrier.getCheckpointId();
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Checkpoint {} canceled, aborting 
alignment.", taskName, cancelledId);
-               }
-
-               if (currentConsumedCheckpointId >= cancelledId && 
!isCheckpointPending()) {
-                       return;
-               }
+               // tag whether we should abort checkpoint from task thread view
+               boolean shouldAbort1 = false;
 
-               if (isCheckpointPending()) {
+               if (cancelledId > currentConsumedCheckpointId) {
+                       currentConsumedCheckpointId = cancelledId;
+                       shouldAbort1 = true;
+               } else if (cancelledId == currentConsumedCheckpointId && 
isCheckpointPending()) {
                        LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                        "Skipping current checkpoint.",
                                taskName,
                                cancelledId,
                                currentConsumedCheckpointId);
 
                        resetConsumedBarriers();
+                       shouldAbort1 = true;
+               }
+
+               // tag whether we should abort checkpoint from 
threadSafeUnaligner view
+               boolean shouldAbort2 = 
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);

Review comment:
       I don't fully understand this `boolean shouldAbort2`. What are the 
conditions when it's set to true while `shouldAbort1` is false? Why do we have 
those two sources of truth?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -387,12 +378,28 @@ synchronized void resetReceivedBarriers(long 
checkpointId) {
                        return allBarriersReceivedFuture;
                }
 
-               synchronized void onChannelClosed() {
+               synchronized boolean onChannelClosed() throws IOException {
                        numOpenChannels--;
+
+                       if (numBarriersReceived > 0) {
+                               resetReceivedBarriers();
+                               notifyAbort(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+                               return true;
+                       }
+                       return false;
                }
 
-               synchronized void setCurrentReceivedCheckpointId(long 
currentReceivedCheckpointId) {
-                       this.currentReceivedCheckpointId = 
Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
+               synchronized boolean setCancelledCheckpointId(long 
canceledCheckpointId) {
+                       boolean shouldAbort = false;
+                       if (canceledCheckpointId > currentReceivedCheckpointId) 
{
+                               currentReceivedCheckpointId = 
canceledCheckpointId;
+                               shouldAbort = true;
+
+                       } else if (canceledCheckpointId == 
currentReceivedCheckpointId && isCheckpointPending()) {
+                               resetReceivedBarriers();
+                               shouldAbort = true;
+                       }
+                       return shouldAbort;

Review comment:
       I think something is missing in the:
   ```
        private void notifyCheckpoint(CheckpointBarrier barrier) throws 
IOException {
                // ignore the previous triggered checkpoint by netty thread if 
it was already canceled or aborted before.
                if (barrier.getId() >= 
threadSafeUnaligner.getCurrentCheckpointId()) {
                        super.notifyCheckpoint(barrier, 0);
                }
        }
   ```
   ? As it is now, after cancelling/closing checkpoint, the above method would 
still trigger the checkpoint, as `barrier.getId()` would be equal to 
`currentReceivedCheckpointId`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to