rkhachatryan commented on a change in pull request #15146:
URL: https://github.com/apache/flink/pull/15146#discussion_r592459388



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -74,82 +90,152 @@ public void barrierAnnouncement(
         }
     }
 
+    private void scheduleAnnouncementTimeout(
+            InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, 
int sequenceNumber) {
+        delayedActionRegistration.schedule(
+                () -> {
+                    long barrierId = announcedBarrier.getId();
+                    if (lastSeenBarrier == barrierId
+                            && lastCompletedBarrier < barrierId
+                            && activeController == alignedController) {

Review comment:
       The check for whether this (announced) barrier was aborted seems missing.
   I see that `lastCompletedBarrier` currently isn't updated on abortion.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -188,12 +266,12 @@ private boolean canTimeout(CheckpointBarrier barrier) {
         return barrier.getCheckpointOptions().isTimeoutable()
                 && barrier.getId() <= lastSeenBarrier
                 && barrier.getCheckpointOptions().getAlignmentTimeout() * 
1_000_000
-                        < (System.nanoTime() - firstBarrierArrivalTime);
+                        < (clock.relativeTimeNanos() - lastBarrierArrivalTime);
     }
 
-    private long getArrivalTime(CheckpointBarrier announcedBarrier) {
-        return announcedBarrier.getCheckpointOptions().isTimeoutable()
-                ? System.nanoTime()
-                : Long.MAX_VALUE;
+    /** A provider for a method to register a delayed action. */
+    @FunctionalInterface
+    public interface DelayedActionRegistration {
+        void schedule(Callable<?> callable, Duration delay);

Review comment:
       I'm thinking about cancellation of the scheduled timers. Depending on 
the effective checkpoint interval and alignmentTimeout there can be up to 
alignmentTimeout / interval active timers.
   
   E.g. if interval = 1s and timeout = 20m then numTimers = 1200, firing every 
second. This is further multiplied by the number of tasks for a single TM.
   
   Or am I missing something?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingControllerTest.java
##########
@@ -259,6 +298,47 @@ private void testTimeoutBarrierOnTwoChannels(
         assertData(gate);
     }
 
+    //    @Test
+    //    public void testActiveTimeoutAlignment() throws Exception {
+    //        int numberOfChannels = 2;
+    //        SingleInputGate inputGate =

Review comment:
       I guess this is some left-over that should be removed in this commit, 
not the next one?
   
   Could also please make the commit message more informative?
   (`[refactor] Extracted builders`)

##########
File path: 
flink-core/src/main/java/org/apache/flink/util/function/TriConsumerWithException.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Operation which is performed on three given arguments.
+ *
+ * @param <S> type of the first argument
+ * @param <T> type of the second argument
+ * @param <U> type of the third argument
+ */
+@PublicEvolving
+@FunctionalInterface
+public interface TriConsumerWithException<S, T, U, E extends Throwable> {

Review comment:
       How about more informative names for type parameters here?
   For example, `<First, Second, Third, Err extends Throwable>`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -74,82 +90,152 @@ public void barrierAnnouncement(
         }
     }
 
+    private void scheduleAnnouncementTimeout(
+            InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, 
int sequenceNumber) {
+        delayedActionRegistration.schedule(
+                () -> {
+                    long barrierId = announcedBarrier.getId();
+                    if (lastSeenBarrier == barrierId
+                            && lastCompletedBarrier < barrierId
+                            && activeController == alignedController) {
+                        // Let's timeout this barrier
+                        unalignedController.barrierAnnouncement(
+                                channelInfo, announcedBarrier, sequenceNumber);

Review comment:
       Why did you use `barrierAnnouncement` method here and not 
`switchToUnaligned` as in another place?
   Shouldn't we transfer all received announcements, switch active controller 
and so on.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -35,15 +39,23 @@
 public class AlternatingController implements 
CheckpointBarrierBehaviourController {
     private final AlignedController alignedController;
     private final UnalignedController unalignedController;
+    private final DelayedActionRegistration delayedActionRegistration;
+    private final Clock clock;
 
     private CheckpointBarrierBehaviourController activeController;
-    private long firstBarrierArrivalTime = Long.MAX_VALUE;
+    private long lastBarrierArrivalTime = Long.MAX_VALUE;

Review comment:
       The meaning of this variable is: arrival time of the first barrier of 
the last seen checkpoint :)
   So both old `firstBarrierArrivalTime` and new `lastBarrierArrivalTime` don't 
reflect it fully.
   But to me, the old name is less confusing.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -74,82 +90,152 @@ public void barrierAnnouncement(
         }
     }
 
+    private void scheduleAnnouncementTimeout(
+            InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, 
int sequenceNumber) {
+        delayedActionRegistration.schedule(
+                () -> {
+                    long barrierId = announcedBarrier.getId();
+                    if (lastSeenBarrier == barrierId
+                            && lastCompletedBarrier < barrierId
+                            && activeController == alignedController) {
+                        // Let's timeout this barrier
+                        unalignedController.barrierAnnouncement(
+                                channelInfo, announcedBarrier, sequenceNumber);
+                    }
+                    return null;
+                },
+                Duration.ofMillis(
+                        
announcedBarrier.getCheckpointOptions().getAlignmentTimeout() + 1));
+    }
+
+    private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+        if (announcedBarrier.getCheckpointOptions().isTimeoutable()) {
+            return clock.relativeTimeNanos();
+        } else {
+            return Long.MAX_VALUE;
+        }
+    }
+
     @Override
-    public Optional<CheckpointBarrier> barrierReceived(
-            InputChannelInfo channelInfo, CheckpointBarrier barrier)
+    public void barrierReceived(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
             throws IOException, CheckpointException {
         if (barrier.getCheckpointOptions().isUnalignedCheckpoint()
                 && activeController == alignedController) {
-            switchToUnaligned(channelInfo, barrier);
-            activeController.barrierReceived(channelInfo, barrier);
-            return Optional.of(barrier);
+            switchToUnaligned(channelInfo, barrier, triggerCheckpoint);
+            activeController.barrierReceived(channelInfo, barrier, 
triggerCheckpoint);
         }
 
         Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(barrier);
         barrier = maybeTimedOut.orElse(barrier);
 
-        checkState(!activeController.barrierReceived(channelInfo, 
barrier).isPresent());
+        activeController.barrierReceived(
+                channelInfo,
+                barrier,
+                checkpointBarrier -> {
+                    throw new IllegalStateException("Control should not 
trigger a checkpoint");
+                });
 
         if (maybeTimedOut.isPresent()) {
             if (activeController == alignedController) {
-                switchToUnaligned(channelInfo, maybeTimedOut.get());
-                return maybeTimedOut;
+                switchToUnaligned(channelInfo, maybeTimedOut.get(), b -> {});
+                triggerCheckpoint.accept(maybeTimedOut.get());
             } else {
                 alignedController.resumeConsumption(channelInfo);
             }
         } else if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()
                 && activeController == unalignedController) {
             alignedController.resumeConsumption(channelInfo);
         }
-        return Optional.empty();
     }
 
     @Override
-    public Optional<CheckpointBarrier> preProcessFirstBarrier(
-            InputChannelInfo channelInfo, CheckpointBarrier barrier)
+    public void preProcessFirstBarrier(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
             throws IOException, CheckpointException {
         if (lastSeenBarrier < barrier.getId()) {
             lastSeenBarrier = barrier.getId();
-            firstBarrierArrivalTime = getArrivalTime(barrier);
+            lastBarrierArrivalTime = getArrivalTime(barrier);
+            if (barrier.getCheckpointOptions().isTimeoutable()
+                    && activeController == alignedController) {
+                scheduleSwitchToUnaligned(channelInfo, barrier, 
triggerCheckpoint);
+            }

Review comment:
       I don't understand when this branch can be executed: if controller is 
aligned now then there was an annoucement for this barrier which has already 
updated `lastSeenBarrier`.
   
   If this is not dead code, then we are potentially scheduling the switch 
unnecessarily because active controller can updated to unaligned a few lines 
below (and it will likely lead to some assertion failures).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -58,7 +70,11 @@ public void barrierAnnouncement(
             throws IOException {
         if (lastSeenBarrier < announcedBarrier.getId()) {
             lastSeenBarrier = announcedBarrier.getId();
-            firstBarrierArrivalTime = getArrivalTime(announcedBarrier);
+            lastBarrierArrivalTime = getArrivalTime(announcedBarrier);
+            if (announcedBarrier.getCheckpointOptions().isTimeoutable()
+                    && activeController == alignedController) {
+                scheduleAnnouncementTimeout(channelInfo, announcedBarrier, 
sequenceNumber);

Review comment:
       I think we should also take into account whether we are going to time 
out the checkpoint passively (below). 
   One option would be to store some flag in this branch, but move 
`scheduleAnnouncementTimeout` to the end of the method.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingController.java
##########
@@ -74,82 +90,152 @@ public void barrierAnnouncement(
         }
     }
 
+    private void scheduleAnnouncementTimeout(
+            InputChannelInfo channelInfo, CheckpointBarrier announcedBarrier, 
int sequenceNumber) {
+        delayedActionRegistration.schedule(
+                () -> {
+                    long barrierId = announcedBarrier.getId();
+                    if (lastSeenBarrier == barrierId
+                            && lastCompletedBarrier < barrierId
+                            && activeController == alignedController) {
+                        // Let's timeout this barrier
+                        unalignedController.barrierAnnouncement(
+                                channelInfo, announcedBarrier, sequenceNumber);
+                    }
+                    return null;
+                },
+                Duration.ofMillis(
+                        
announcedBarrier.getCheckpointOptions().getAlignmentTimeout() + 1));
+    }
+
+    private long getArrivalTime(CheckpointBarrier announcedBarrier) {
+        if (announcedBarrier.getCheckpointOptions().isTimeoutable()) {
+            return clock.relativeTimeNanos();
+        } else {
+            return Long.MAX_VALUE;
+        }
+    }
+
     @Override
-    public Optional<CheckpointBarrier> barrierReceived(
-            InputChannelInfo channelInfo, CheckpointBarrier barrier)
+    public void barrierReceived(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
             throws IOException, CheckpointException {
         if (barrier.getCheckpointOptions().isUnalignedCheckpoint()
                 && activeController == alignedController) {
-            switchToUnaligned(channelInfo, barrier);
-            activeController.barrierReceived(channelInfo, barrier);
-            return Optional.of(barrier);
+            switchToUnaligned(channelInfo, barrier, triggerCheckpoint);
+            activeController.barrierReceived(channelInfo, barrier, 
triggerCheckpoint);
         }
 
         Optional<CheckpointBarrier> maybeTimedOut = asTimedOut(barrier);
         barrier = maybeTimedOut.orElse(barrier);
 
-        checkState(!activeController.barrierReceived(channelInfo, 
barrier).isPresent());
+        activeController.barrierReceived(
+                channelInfo,
+                barrier,
+                checkpointBarrier -> {
+                    throw new IllegalStateException("Control should not 
trigger a checkpoint");
+                });
 
         if (maybeTimedOut.isPresent()) {
             if (activeController == alignedController) {
-                switchToUnaligned(channelInfo, maybeTimedOut.get());
-                return maybeTimedOut;
+                switchToUnaligned(channelInfo, maybeTimedOut.get(), b -> {});
+                triggerCheckpoint.accept(maybeTimedOut.get());
             } else {
                 alignedController.resumeConsumption(channelInfo);
             }
         } else if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()
                 && activeController == unalignedController) {
             alignedController.resumeConsumption(channelInfo);
         }
-        return Optional.empty();
     }
 
     @Override
-    public Optional<CheckpointBarrier> preProcessFirstBarrier(
-            InputChannelInfo channelInfo, CheckpointBarrier barrier)
+    public void preProcessFirstBarrier(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> triggerCheckpoint)
             throws IOException, CheckpointException {
         if (lastSeenBarrier < barrier.getId()) {
             lastSeenBarrier = barrier.getId();
-            firstBarrierArrivalTime = getArrivalTime(barrier);
+            lastBarrierArrivalTime = getArrivalTime(barrier);
+            if (barrier.getCheckpointOptions().isTimeoutable()
+                    && activeController == alignedController) {
+                scheduleSwitchToUnaligned(channelInfo, barrier, 
triggerCheckpoint);
+            }
         }
         activeController = chooseController(barrier);
-        return activeController.preProcessFirstBarrier(channelInfo, barrier);
+        activeController.preProcessFirstBarrier(channelInfo, barrier, 
triggerCheckpoint);
     }
 
-    private void switchToUnaligned(InputChannelInfo channelInfo, 
CheckpointBarrier barrier)
+    private void scheduleSwitchToUnaligned(
+            InputChannelInfo channelInfo,
+            CheckpointBarrier barrier,
+            ThrowingConsumer<CheckpointBarrier, IOException> 
triggerCheckpoint) {
+        delayedActionRegistration.schedule(
+                () -> {
+                    long barrierId = barrier.getId();
+                    if (lastSeenBarrier == barrierId
+                            && lastCompletedBarrier < barrierId
+                            && activeController == alignedController) {
+                        switchToUnaligned(channelInfo, barrier.asUnaligned(), 
triggerCheckpoint);
+                    }
+                    return null;
+                },
+                
Duration.ofMillis(barrier.getCheckpointOptions().getAlignmentTimeout() + 1));

Review comment:
       Could you explain why do we need +1 here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to