lindong28 commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1279012662


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -426,6 +456,39 @@ public boolean isShutdown() {
         return shutdown;
     }
 
+    /**
+     * Reports whether a source operator is currently processing backlog.
+     *
+     * <p>If any source operator is processing backlog, the checkpoint 
interval would be decided by
+     * {@code execution.checkpointing.interval-during-backlog} instead of 
{@code
+     * execution.checkpointing.interval}.
+     *
+     * <p>If a source has not invoked this method, the source is considered to 
have
+     * isProcessingBacklog=false. If a source operator has invoked this method 
multiple times, the
+     * last reported value is used.
+     *
+     * @param operatorID the operator ID of the source operator.
+     * @param isProcessingBacklog whether the source operator is processing 
backlog.
+     */
+    public void setIsProcessingBacklog(OperatorID operatorID, boolean 
isProcessingBacklog) {
+        synchronized (lock) {
+            if (isProcessingBacklog) {
+                backlogOperators.add(operatorID);
+            } else {
+                backlogOperators.remove(operatorID);
+            }
+
+            long currentCheckpointInterval = getCurrentCheckpointInterval();
+            if (currentCheckpointInterval != Long.MAX_VALUE) {
+                long currentRelativeTime = clock.relativeTimeMillis();
+                if (currentRelativeTime + currentCheckpointInterval
+                        < nextCheckpointTriggeringRelativeTime) {

Review Comment:
   It would be safer to do the following:
   
   long delay = Math.max(0, nextCheckpointTriggeringRelativeTime - 
clock.relativeTimeMillis())



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -1081,6 +1082,21 @@ public void notifyEndOfData(ExecutionAttemptID 
executionAttemptID) {
             if (vertexEndOfDataListener.areAllTasksEndOfData()) {
                 triggerCheckpoint(CheckpointType.CONFIGURED);
             }
+            if (vertexEndOfDataListener.areAllTasksOfJobVertexEndOfData(

Review Comment:
   Would it be better to move this before 
`triggerCheckpoint(CheckpointType.CONFIGURED)` shown above?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexEndOfDataListener.java:
##########
@@ -51,6 +51,12 @@ public void recordTaskEndOfData(ExecutionAttemptID 
executionAttemptID) {
         subtaskStatus.set(executionAttemptID.getSubtaskIndex());
     }
 
+    public boolean areAllTasksOfJobVertexEndOfData(JobVertexID jobVertexID) {
+        BitSet subtaskStatus = tasksReachedEndOfData.get(jobVertexID);
+        return subtaskStatus.cardinality()

Review Comment:
   This would cause NPE given that we remove `jobVertexID` from 
`tasksReachedEndOfData` after all its subtasks have finished.
   
   Also, given that `areAllTasksOfJobVertexEndOfData()` is currently invoked 
after `areAllTasksEndOfData()`, it suggests that this code path is not tested. 
Can you add a test for this scenario?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java:
##########
@@ -277,6 +278,10 @@ interface Context {
          * concurrent running execution attempts.
          */
         boolean isConcurrentExecutionAttemptsSupported();
+
+        /** Gets the checkpoint coordinator of this job, if snapshot 
checkpoints are enabled. */

Review Comment:
   Would the following doc be more readable?
   
   Gets the checkpoint coordinator of this job. Return null if checkpoint is 
disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to