rkhachatryan commented on a change in pull request #15146:
URL: https://github.com/apache/flink/pull/15146#discussion_r592467014
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -74,82 +90,152 @@ public void barrierAnnouncement(
}
}
+ private void scheduleAnnouncementTimeout(
+ InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier,
int sequenceNumber) {
+ delayedActionRegistration.schedule(
+ () -> {
+ long barrierId = announcedBarrier.getId();
+ if (lastSeenBarrier == barrierId
+ && lastCompletedBarrier < barrierId
+ && activeController == alignedController) {
+ // Let's timeout this barrier
+ unalignedController.barrierAnnouncement(
+ channelInfo, announcedBarrier, sequenceNumber);
+ }
+ return null;
+ },
+ Duration.ofMillis(
+
announcedBarrier.getCheckpointOptions().getAlignmentTimeout() + 1));
+ }
+
+ private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+ if (announcedBarrier.getCheckpointOptions().isTimeoutable()) {
+ return clock.relativeTimeNanos();
+ } else {
+ return Long.MAX_VALUE;
+ }
+ }
+
@Override
- public Optional<CheckpointBarrier> barrierReceived(
- InputChannelInfo channelInfo, CheckpointBarrier barrier)
+ public void barrierReceived(
+ InputChannelInfo channelInfo,
+ CheckpointBarrier barrier,
+ ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
throws IOException, CheckpointException {
if (barrier.getCheckpointOptions().isUnalignedCheckpoint()
&& activeController == alignedController) {
- switchToUnaligned(channelInfo, barrier);
- activeController.barrierReceived(channelInfo, barrier);
- return Optional.of(barrier);
+ switchToUnaligned(channelInfo, barrier, triggerCheckpoint);
+ activeController.barrierReceived(channelInfo, barrier,
triggerCheckpoint);
}
Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(barrier);
barrier = maybeTimedOut.orElse(barrier);
- checkState(!activeController.barrierReceived(channelInfo,
barrier).isPresent());
+ activeController.barrierReceived(
+ channelInfo,
+ barrier,
+ checkpointBarrier -> {
+ throw new IllegalStateException("Control should not
trigger a checkpoint");
+ });
if (maybeTimedOut.isPresent()) {
if (activeController == alignedController) {
- switchToUnaligned(channelInfo, maybeTimedOut.get());
- return maybeTimedOut;
+ switchToUnaligned(channelInfo, maybeTimedOut.get(), b -> {});
+ triggerCheckpoint.accept(maybeTimedOut.get());
} else {
alignedController.resumeConsumption(channelInfo);
}
} else if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()
&& activeController == unalignedController) {
alignedController.resumeConsumption(channelInfo);
}
- return Optional.empty();
}
@Override
- public Optional<CheckpointBarrier> preProcessFirstBarrier(
- InputChannelInfo channelInfo, CheckpointBarrier barrier)
+ public void preProcessFirstBarrier(
+ InputChannelInfo channelInfo,
+ CheckpointBarrier barrier,
+ ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
throws IOException, CheckpointException {
if (lastSeenBarrier < barrier.getId()) {
lastSeenBarrier = barrier.getId();
- firstBarrierArrivalTime = getArrivalTime(barrier);
+ lastBarrierArrivalTime = getArrivalTime(barrier);
+ if (barrier.getCheckpointOptions().isTimeoutable()
+ && activeController == alignedController) {
+ scheduleSwitchToUnaligned(channelInfo, barrier,
triggerCheckpoint);
+ }
Review comment:
I don't understand when this branch can be executed: if controller is
aligned now then there was an annoucement for this barrier which has already
updated `lastSeenBarrier`. Could you explain? (I'm not sure whether it's a dead
code or not).
If this is not dead code, then we are potentially scheduling the switch
unnecessarily because active controller can updated to unaligned a few lines
below (and it will likely lead to some assertion failures).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]