dawidwys commented on a change in pull request #15146:
URL: https://github.com/apache/flink/pull/15146#discussion_r594150872



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -74,82 +90,152 @@ public void barrierAnnouncement(
         }
     }
 
+    private void scheduleAnnouncementTimeout(
+            InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, 
int sequenceNumber) {
+        delayedActionRegistration.schedule(
+                () -> {
+                    long barrierId = announcedBarrier.getId();
+                    if (lastSeenBarrier == barrierId
+                            && lastCompletedBarrier < barrierId
+                            && activeController == alignedController) {
+                        // Let's timeout this barrier
+                        unalignedController.barrierAnnouncement(
+                                channelInfo, announcedBarrier, sequenceNumber);
+                    }
+                    return null;
+                },
+                Duration.ofMillis(
+                        
announcedBarrier.getCheckpointOptions().getAlignmentTimeout() + 1));
+    }
+
+    private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+        if (announcedBarrier.getCheckpointOptions().isTimeoutable()) {
+            return clock.relativeTimeNanos();
+        } else {
+            return Long.MAX_VALUE;
+        }
+    }
+
     @Override
-    public Optional<CheckpointBarrier> barrierReceived(
-            InputChannelInfo channelInfo, CheckpointBarrier barrier)
+    public void barrierReceived(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
             throws IOException, CheckpointException {
         if (barrier.getCheckpointOptions().isUnalignedCheckpoint()
                 && activeController == alignedController) {
-            switchToUnaligned(channelInfo, barrier);
-            activeController.barrierReceived(channelInfo, barrier);
-            return Optional.of(barrier);
+            switchToUnaligned(channelInfo, barrier, triggerCheckpoint);
+            activeController.barrierReceived(channelInfo, barrier, 
triggerCheckpoint);
         }
 
         Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(barrier);
         barrier = maybeTimedOut.orElse(barrier);
 
-        checkState(!activeController.barrierReceived(channelInfo, 
barrier).isPresent());
+        activeController.barrierReceived(
+                channelInfo,
+                barrier,
+                checkpointBarrier -> {
+                    throw new IllegalStateException("Control should not 
trigger a checkpoint");
+                });
 
         if (maybeTimedOut.isPresent()) {
             if (activeController == alignedController) {
-                switchToUnaligned(channelInfo, maybeTimedOut.get());
-                return maybeTimedOut;
+                switchToUnaligned(channelInfo, maybeTimedOut.get(), b -> {});
+                triggerCheckpoint.accept(maybeTimedOut.get());
             } else {
                 alignedController.resumeConsumption(channelInfo);
             }
         } else if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()
                 && activeController == unalignedController) {
             alignedController.resumeConsumption(channelInfo);
         }
-        return Optional.empty();
     }
 
     @Override
-    public Optional<CheckpointBarrier> preProcessFirstBarrier(
-            InputChannelInfo channelInfo, CheckpointBarrier barrier)
+    public void preProcessFirstBarrier(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
             throws IOException, CheckpointException {
         if (lastSeenBarrier < barrier.getId()) {
             lastSeenBarrier = barrier.getId();
-            firstBarrierArrivalTime = getArrivalTime(barrier);
+            lastBarrierArrivalTime = getArrivalTime(barrier);
+            if (barrier.getCheckpointOptions().isTimeoutable()
+                    && activeController == alignedController) {
+                scheduleSwitchToUnaligned(channelInfo, barrier, 
triggerCheckpoint);
+            }
         }
         activeController = chooseController(barrier);
-        return activeController.preProcessFirstBarrier(channelInfo, barrier);
+        activeController.preProcessFirstBarrier(channelInfo, barrier, 
triggerCheckpoint);
     }
 
-    private void switchToUnaligned(InputChannelInfo channelInfo, 
CheckpointBarrier barrier)
+    private void scheduleSwitchToUnaligned(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> 
triggerCheckpoint) {
+        delayedActionRegistration.schedule(
+                () -> {
+                    long barrierId = barrier.getId();
+                    if (lastSeenBarrier == barrierId
+                            && lastCompletedBarrier < barrierId
+                            && activeController == alignedController) {
+                        switchToUnaligned(channelInfo, barrier.asUnaligned(), 
triggerCheckpoint);
+                    }
+                    return null;
+                },
+                
Duration.ofMillis(barrier.getCheckpointOptions().getAlignmentTimeout() + 1));

Review comment:
       The +1 is because of what we considered timed out before:
   ```
   barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000
                           < (System.nanoTime() - firstBarrierArrivalTime)
   ```
   firstBarrierArrivalTime + alignment timeout was not considered as timed out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to