dawidwys commented on a change in pull request #15313:
URL: https://github.com/apache/flink/pull/15313#discussion_r603328329



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerAction.java
##########
@@ -74,9 +58,10 @@ public final BarrierHandlerAction barrierReceived(
             context.triggerGlobalCheckpoint(checkpointBarrier);
             state.unblockAllChannels();
             return new AlternatingWaitingForFirstBarrier(state.getInputs());
-        } else if (context.isTimedOut(checkpointBarrier)
-                || 
checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
-            return alignmentTimeout(context, checkpointBarrier);
+        } else if (context.isTimedOut(checkpointBarrier)) {
+            state.removeFromBlocked(channelInfo);

Review comment:
       Yes, that is tricky. Let me try to explain.
   
   1. We call `state.blockChannel(channelInfo)` in line 58, so this channel 
will be assumed blocked.
   2. In `alignmentTimeout` we do unblock all channels that we assume blocked 
(including the current channel from the step above)
   3. We pass the current barrier + channel to the result of 
`state.alignmentTimeout`. The barrier is aligned therefore the state will try 
to unblock the channel.
   4. If we do not remove the current channel from the list of states we assume 
blocked, then we will try to unblock the channel twice (once in 
`alignmentTimeout`, second time in `barrierReceived` on the result of 
`alignmentTimeout`).
   
   In other words you can read it as that we roll back changes made by the 
current barrier as we will process it by the intermediate state created by 
`alignmentTimeout`. 
   
   I was considering putting the `blockChannel` call in an if-block, but this 
would make it even more complicated imo, as we would have to account for the 
situation when it is the last barrier. Something like:
   
   ```
           state.removeSeenAnnouncement(channelInfo);
           boolean isTimedOut = controller.isTimedOut(checkpointBarrier);
           if (!isTimedOut) {
               state.blockChannel(channelInfo);
           }
           if (controller.allBarriersReceived()) {
               if (isTimedOut) {
                   state.blockChannel(channelInfo);
               }
               controller.triggerGlobalCheckpoint(checkpointBarrier);
               state.unblockAllChannels();
               return new AlternatingWaitingForFirstBarrier(state.getInputs());
           } else if (isTimedOut) {
               return alignmentTimeout(controller, checkpointBarrier)
                       .barrierReceived(controller, channelInfo, 
checkpointBarrier);
           }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to