lindong28 commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1278817765


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -180,8 +186,25 @@ public class CheckpointCoordinator {
     /** Actor that receives status updates from the execution graph this 
coordinator works for. */
     private JobStatusListener jobStatusListener;
 
+    /**
+     * The current periodic trigger. Used to deduplicate concurrently 
scheduled checkpoints if any.
+     */
+    @GuardedBy("lock")
+    private ScheduledTrigger currentPeriodicTrigger;
+
     /** A handle to the current periodic trigger, to cancel it when necessary. 
*/
-    private ScheduledFuture<?> currentPeriodicTrigger;
+    @GuardedBy("lock")
+    private ScheduledFuture<?> currentPeriodicTriggerFuture;
+
+    /**
+     * The timestamp (via {@link Clock#relativeTimeMillis()}) when the next 
checkpoint will be
+     * triggered.
+     *
+     * <p>If it's value is {@link Long#MAX_VALUE}, it means there is not a 
next checkpoint
+     * scheduled.
+     */
+    @GuardedBy("lock")
+    private volatile long nextCheckpointTriggeringRelativeTime;

Review Comment:
   Now that it will always be used inside `synchronized (lock)`, we can remove 
the `volatile` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -2013,23 +2083,32 @@ private void abortPendingCheckpoints(
 
     private void rescheduleTrigger(long tillNextMillis) {
         cancelPeriodicTrigger();
-        currentPeriodicTrigger = scheduleTriggerWithDelay(tillNextMillis);
+        scheduleTriggerWithDelay(clock.relativeTimeMillis(), tillNextMillis);

Review Comment:
   It seems more readable to invoke 
`rescheduleTrigger(clock.relativeTimeMillis(), tillNextMillis)` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -328,6 +337,7 @@ public void executionAttemptFailed(
 
                     context.unregisterSourceReader(subtaskId, attemptNumber);
                     context.attemptFailed(subtaskId, attemptNumber);
+                    markSubtaskAsNoData(subtaskId);

Review Comment:
   The logical subtask still has data even if the physical subtask failed. 
Failed subtask will be restarted later and we will set 
`subtasksWithData.add(subtaskId)` very soon after it is restarted. Therefore, 
it seems better to remove this line.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java:
##########
@@ -422,6 +423,15 @@ public <T> void callAsync(
         public void runInCoordinatorThread(Runnable runnable) {
             realContext.runInCoordinatorThread(runnable);
         }
+
+        @Override
+        public void setIsProcessingBacklog(boolean isProcessingBacklog) {
+            if (sourceIndex < sourceSize - 1) {

Review Comment:
   Would it be simpler to remove the `if` statement here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -106,6 +108,10 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
     private final SourceCoordinatorContext<SplitT> context;
 
     private final CoordinatorStore coordinatorStore;
+
+    /** Indices of the subtasks that may still has data to emit. Used to set 
backlog status. */
+    private final Set<Integer> subtasksWithData;

Review Comment:
   Instead of maintaining this state in `SourceCoordinator`, would it be 
simpler to re-use the existing `VertexEndOfDataListener` and let this class 
inform `CheckpointCoordinator` whether the all subtasks of an operator has 
received end-of-data?
   
   Note that you can use `ExecutionJobVertex#getOperatorIDs` to retrieve 
operator Ids of a job vertex.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -150,11 +150,17 @@ public class CheckpointCoordinator {
     private final CheckpointIDCounter checkpointIdCounter;
 
     /**
-     * The base checkpoint interval. Actual trigger time may be affected by 
the max concurrent
-     * checkpoints and minimum-pause values
+     * The checkpoint interval in normal situations. Actual trigger time may 
be affected by the max

Review Comment:
   It is not clear what `normal` means.
   
   How about saying this:
   The checkpoint interval when there is no source reporting 
isProcessingBacklog=true.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -2075,10 +2154,30 @@ public void reportStats(long id, ExecutionAttemptID 
attemptId, CheckpointMetrics
 
     // ------------------------------------------------------------------------
 
-    private final class ScheduledTrigger implements Runnable {
+    final class ScheduledTrigger implements Runnable {
 
         @Override
         public void run() {
+            synchronized (lock) {
+                if (currentPeriodicTrigger != this) {
+                    // Another periodic trigger has been scheduled but this one
+                    // has not been force cancelled yet.
+                    return;
+                }
+
+                long checkpointInterval = getCurrentCheckpointInterval();
+                if (checkpointInterval != Long.MAX_VALUE) {
+                    nextCheckpointTriggeringRelativeTime =
+                            clock.relativeTimeMillis() + checkpointInterval;

Review Comment:
   Should we use `nextCheckpointTriggeringRelativeTime += checkpointInterval` 
for consistency with the semantics of `scheduleAtFixedRate`()?
   
   Then we can schedule runnable with delay = 
`nextCheckpointTriggeringRelativeTime - clock.relativeTimeMillis()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to