dawidwys commented on a change in pull request #15313:
URL: https://github.com/apache/flink/pull/15313#discussion_r603328329
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AbstractAlternatingAlignedBarrierHandlerAction.java
##########
@@ -74,9 +58,10 @@ public final BarrierHandlerAction barrierReceived(
context.triggerGlobalCheckpoint(checkpointBarrier);
state.unblockAllChannels();
return new AlternatingWaitingForFirstBarrier(state.getInputs());
- } else if (context.isTimedOut(checkpointBarrier)
- ||
checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
- return alignmentTimeout(context, checkpointBarrier);
+ } else if (context.isTimedOut(checkpointBarrier)) {
+ state.removeFromBlocked(channelInfo);
Review comment:
Yes, that is tricky. Let me try to explain.
1. We call `state.blockChannel(channelInfo)` in line 58, so this channel
will be assumed blocked.
2. In `alignmentTimeout` we do unblock all channels that we assume blocked
(including the current channel from the step above)
3. We pass the current barrier + channel to the result of
`state.alignmentTimeout`. The barrier is aligned therefore the state will try
to unblock the channel.
4. If we do not remove the current channel from the list of states we assume
blocked, then we will try to unblock the channel twice (once in
`alignmentTimeout`, second time in `barrierReceived` on the result of
`alignmentTimeout`).
In other words you can read it as that we roll back changes made by the
current barrier as we will process it by the intermediate state created by
`alignmentTimeout`.
I was considering putting the `blockChannel` call in an if-block, but this
would make it even more complicated imo, as we would have to account for the
situation when it is the last barrier. Something like:
```
state.removeSeenAnnouncement(channelInfo);
boolean isTimedOut = controller.isTimedOut(checkpointBarrier);
if (!isTimedOut) {
state.blockChannel(channelInfo);
}
if (controller.allBarriersReceived()) {
if (isTimedOut) {
state.blockChannel(channelInfo);
}
controller.triggerGlobalCheckpoint(checkpointBarrier);
state.unblockAllChannels();
return new AlternatingWaitingForFirstBarrier(state.getInputs());
} else if (isTimedOut) {
return alignmentTimeout(controller, checkpointBarrier)
.barrierReceived(controller, channelInfo,
checkpointBarrier);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]