lindong28 commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1266284788


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -291,13 +313,27 @@ public CheckpointCoordinator(
 
         // it does not make sense to schedule checkpoints more often then the 
desired
         // time between checkpoints
-        long baseInterval = chkConfig.getCheckpointInterval();
-        if (baseInterval < minPauseBetweenCheckpoints) {
-            baseInterval = minPauseBetweenCheckpoints;
+        long checkpointInterval = chkConfig.getCheckpointInterval();
+        if (checkpointInterval < minPauseBetweenCheckpoints) {
+            checkpointInterval = minPauseBetweenCheckpoints;
+        }
+
+        long checkpointIntervalDuringBacklog = 
chkConfig.getCheckpointIntervalDuringBacklog();
+        if (checkpointIntervalDuringBacklog < minPauseBetweenCheckpoints) {
+            checkpointIntervalDuringBacklog = minPauseBetweenCheckpoints;
+        }
+
+        if (checkpointIntervalDuringBacklog < checkpointInterval) {
+            throw new IllegalArgumentException(
+                    "execution.checkpoint.interval-during-backlog must either 
be 0, "
+                            + "or be larger than or equal to 
execution.checkpoint.interval.");
         }
 
         this.job = checkNotNull(job);
-        this.baseInterval = baseInterval;
+        this.checkpointInterval = checkpointInterval;
+        this.checkpointIntervalDuringBacklog = checkpointIntervalDuringBacklog;
+        this.lastCheckpointTriggeringRelativeTime = NO_CHECKPOINT;

Review Comment:
   Would it be simpler to just initialize this variable to 0 without using 
`NO_CHECKPOINT`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java:
##########
@@ -303,6 +303,8 @@ public Object getPreviousEnumerator() {
         }
         LOG.info("Starting enumerator for sourceIndex={}", currentSourceIndex);
         currentEnumerator.start();
+
+        context.setIsProcessingBacklog(currentSourceIndex < sources.size() - 
1);

Review Comment:
   Would it be better to invoke `#setIsProcessingBacklog` before starting the 
currentEnumerate?
   
   Otherwise, there might be a short interval during which the hybridSource has 
started to run the new stage but the backlog status is still not updated. It 
might not be a problem in most cases in practice. Still, it might be better to 
avoid it entirely.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -164,6 +167,15 @@ public void lazyInitialize(
         context.lazyInitialize(
                 globalFailureHandler, mainThreadExecutor, 
operatorCoordinatorMetricGroup);
 
+        OperatorCoordinator rootCoordinator = coordinator;
+        if (coordinator instanceof RecreateOnResetOperatorCoordinator) {
+            rootCoordinator =
+                    ((RecreateOnResetOperatorCoordinator) 
rootCoordinator).getInternalCoordinator();
+        }
+        if (rootCoordinator instanceof SourceCoordinator) {
+            ((SourceCoordinator<?, ?>) 
rootCoordinator).lazyInitialize(checkpointCoordinator);

Review Comment:
   Could you confirm that it is not possible (or much harder) to pass 
`checkpointCoordinator` to `SourceCoordinatorContext`'s constructor?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements 
Runnable {
 
         @Override
         public void run() {
+            long currentTime = clock.relativeTimeMillis();
+            if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT
+                    && currentTime - lastCheckpointTriggeringRelativeTime < 
baseInterval) {

Review Comment:
   Would it be simpler to do this:
   
   ```
   if (currentTime - lastCheckpointTriggeringRelativeTime < baseInterval) {
       return;
   }
   ```



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -222,6 +222,25 @@ public void setCheckpointInterval(long checkpointInterval) 
{
                 Duration.ofMillis(checkpointInterval));
     }
 
+    public long getCheckpointIntervalDuringBacklog() {
+        return configuration
+                
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG)
+                .map(Duration::toMillis)
+                .orElseGet(this::getCheckpointInterval);
+    }
+
+    public void setCheckpointIntervalDuringBacklog(long checkpointInterval) {
+        if (checkpointInterval != -1L && checkpointInterval < 
MINIMAL_CHECKPOINT_TIME) {

Review Comment:
   Can you explain when will the input `checkpointInterval` has value -1?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -150,10 +152,22 @@ public class CheckpointCoordinator {
     private final CheckpointIDCounter checkpointIdCounter;
 
     /**
-     * The base checkpoint interval. Actual trigger time may be affected by 
the max concurrent
-     * checkpoints and minimum-pause values
+     * The checkpoint interval in normal situations. Actual trigger time may 
be affected by the max
+     * concurrent checkpoints, minimum-pause values and checkpoint interval 
during backlog.
      */
-    private final long baseInterval;
+    private final long checkpointInterval;
+
+    /**
+     * The checkpoint interval when any source reports 
isProcessingBacklog=true. Actual trigger time
+     * may be affected by the max concurrent checkpoints and minimum-pause 
values.
+     */
+    private final long checkpointIntervalDuringBacklog;

Review Comment:
   Would it be better to keep the name consistent with the existing 
`baseInterval`? We can either rename `baseInterval` as `checkpointingInterval`, 
or rename `checkpointIntervalDuringBacklog` as `baseIntervalDuringBacklog`.
   



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java:
##########
@@ -181,4 +181,16 @@ <T> void callAsync(
      * @param runnable a runnable to execute
      */
     void runInCoordinatorThread(Runnable runnable);
+
+    /**
+     * Reports to JM whether this source is currently processing backlog.
+     *
+     * <p>When source is processing backlog, it means the records being 
emitted by this source is
+     * already stale and there is no processing latency requirement for these 
records. This allows
+     * downstream operators to optimize throughput instead of reducing latency 
for intermediate
+     * results.
+     *
+     * <p>If this method is not invoked, the source is considered to have 
isProcessingBacklog=false.

Review Comment:
   Since we plan to introduce other APIs for source to set its backlog status 
(e.g. in source reader), it might be better to say the following:
   
   ```
   If no API has been explicitly invoked to specify the backlog status of a 
source, the source is considered to have isProcessingBacklog=false by default.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -164,6 +167,15 @@ public void lazyInitialize(
         context.lazyInitialize(
                 globalFailureHandler, mainThreadExecutor, 
operatorCoordinatorMetricGroup);
 
+        OperatorCoordinator rootCoordinator = coordinator;
+        if (coordinator instanceof RecreateOnResetOperatorCoordinator) {
+            rootCoordinator =
+                    ((RecreateOnResetOperatorCoordinator) 
rootCoordinator).getInternalCoordinator();

Review Comment:
   Suppose a `RecreateOnResetOperatorCoordinator` creates a new 
`OperatorCoordinator` instance internally when it is reset to checkpoint, I 
suppose this code path will not be invoked again and therefore the newly 
created `OperatorCoordinator` will not know the reference to the checkpoint 
coordinator. Is it OK?
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -2079,11 +2149,19 @@ private final class ScheduledTrigger implements 
Runnable {
 
         @Override
         public void run() {
+            long currentTime = clock.relativeTimeMillis();
+            if (lastCheckpointTriggeringRelativeTime != NO_CHECKPOINT
+                    && currentTime - lastCheckpointTriggeringRelativeTime < 
baseInterval) {
+                return;
+            }
+            lastCheckpointTriggeringRelativeTime = currentTime;
+
             try {
                 triggerCheckpoint(checkpointProperties, null, true);
             } catch (Exception e) {
                 LOG.error("Exception while triggering checkpoint for job {}.", 
job, e);
             }
+            timer.schedule(this, baseInterval, TimeUnit.MILLISECONDS);

Review Comment:
   Does the current implementation ensure the following behavior:
   
   Let's say the last checkpoint is triggered at time T1. And then the 
checkpoint interval is updated to value X at time T2. The next checkpoint will 
be triggered at the time `max(T2, T1 + X)`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -426,6 +462,41 @@ public boolean isShutdown() {
         return shutdown;
     }
 
+    /**
+     * Reports whether a source operator is currently processing backlog.
+     *
+     * <p>If any source operator is processing backlog, the checkpoint 
interval would be decided by
+     * {@code execution.checkpointing.interval-during-backlog} instead of 
{@code
+     * execution.checkpointing.interval}.
+     *
+     * <p>If a source has not invoked this method, the source is considered to 
have
+     * isProcessingBacklog=false. If a source operator has invoked this method 
multiple times, the
+     * last reported value is used.
+     *
+     * @param operatorID the operator ID of the source operator.
+     * @param isProcessingBacklog whether the source operator is processing 
backlog.
+     */
+    public void setIsProcessingBacklog(OperatorID operatorID, boolean 
isProcessingBacklog) {
+        boolean wasBacklog = !backlogOperators.isEmpty();
+        if (isProcessingBacklog) {
+            backlogOperators.add(operatorID);
+        } else {
+            backlogOperators.remove(operatorID);
+        }
+
+        if (wasBacklog && backlogOperators.isEmpty()) {
+            baseInterval = checkpointInterval;

Review Comment:
   Instead of keeping the checkpoint interval in a dedicated variable, would it 
be simpler to add the following method so that we have less (duplicate) state 
to maintain?
   
   ```
   private long getCurrentCheckpointInterval() {
       return backlogOperators.isEmpty() ? checkpointInterval : 
checkpointIntervalDuringBacklog;
   }
   ```
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -222,6 +241,9 @@ public class CheckpointCoordinator {
 
     private final CheckpointPlanCalculator checkpointPlanCalculator;
 
+    /** IDs of the source operators that are currently processing backlog. */
+    private final Set<OperatorID> backlogOperators = new HashSet<>();

Review Comment:
   Will this map be accessed by multiple threads concurrently? Do we need to 
make it thread-safe?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -150,10 +152,22 @@ public class CheckpointCoordinator {
     private final CheckpointIDCounter checkpointIdCounter;
 
     /**
-     * The base checkpoint interval. Actual trigger time may be affected by 
the max concurrent
-     * checkpoints and minimum-pause values
+     * The checkpoint interval in normal situations. Actual trigger time may 
be affected by the max
+     * concurrent checkpoints, minimum-pause values and checkpoint interval 
during backlog.
      */
-    private final long baseInterval;
+    private final long checkpointInterval;
+
+    /**
+     * The checkpoint interval when any source reports 
isProcessingBacklog=true. Actual trigger time
+     * may be affected by the max concurrent checkpoints and minimum-pause 
values.
+     */
+    private final long checkpointIntervalDuringBacklog;
+
+    /**
+     * The base checkpoint interval. Its value might be either 
checkpointInterval or
+     * checkpointIntervalDuringBacklog depending on whether there is 
processing backlog.
+     */
+    private long baseInterval;

Review Comment:
   Since this value change be same as `checkpointIntervalDuringBacklog` 
depending on the status of source, it seems more intuitive to name it 
`currentCheckpointInterval`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -426,6 +462,41 @@ public boolean isShutdown() {
         return shutdown;
     }
 
+    /**
+     * Reports whether a source operator is currently processing backlog.
+     *
+     * <p>If any source operator is processing backlog, the checkpoint 
interval would be decided by
+     * {@code execution.checkpointing.interval-during-backlog} instead of 
{@code
+     * execution.checkpointing.interval}.
+     *
+     * <p>If a source has not invoked this method, the source is considered to 
have
+     * isProcessingBacklog=false. If a source operator has invoked this method 
multiple times, the
+     * last reported value is used.
+     *
+     * @param operatorID the operator ID of the source operator.
+     * @param isProcessingBacklog whether the source operator is processing 
backlog.
+     */
+    public void setIsProcessingBacklog(OperatorID operatorID, boolean 
isProcessingBacklog) {
+        boolean wasBacklog = !backlogOperators.isEmpty();
+        if (isProcessingBacklog) {
+            backlogOperators.add(operatorID);
+        } else {
+            backlogOperators.remove(operatorID);
+        }
+
+        if (wasBacklog && backlogOperators.isEmpty()) {
+            baseInterval = checkpointInterval;
+        } else if (!wasBacklog && !backlogOperators.isEmpty()) {
+            baseInterval = checkpointIntervalDuringBacklog;
+        } else {
+            return;
+        }
+
+        if (baseInterval != Long.MAX_VALUE) {
+            scheduleTriggerWithDelay(getRandomInitDelay());

Review Comment:
   Would it be better (and more consistent with the existing checkpointing 
behavior) to introduce "random initial delay" only once instead of introducing 
it every time the backlog status changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to