mohitjain2504 commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1479254174


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java:
##########
@@ -31,19 +35,31 @@
 public class CheckpointCoordinatorDeActivator implements JobStatusListener {
 
     private final CheckpointCoordinator coordinator;
+    private final Map<JobVertexID, ExecutionJobVertex> tasks;
 
-    public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) 
{
+    public CheckpointCoordinatorDeActivator(
+            CheckpointCoordinator coordinator, Map<JobVertexID, 
ExecutionJobVertex> tasks) {
         this.coordinator = checkNotNull(coordinator);
+        this.tasks = checkNotNull(tasks);
     }
 
     @Override
     public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
-        if (newJobStatus == JobStatus.RUNNING) {
-            // start the checkpoint scheduler
+        if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking()) {
+            // start the checkpoint scheduler if there is no blocking edge
             coordinator.startCheckpointScheduler();
         } else {
             // anything else should stop the trigger for now
             coordinator.stopCheckpointScheduler();
         }
     }
+
+    private boolean allTasksOutputNonBlocking() {
+        for (ExecutionJobVertex vertex : tasks.values()) {

Review Comment:
   can also write it in this manner
   ```
       return tasks.values().stream()
                    .noneMatch(vertex -> 
vertex.getJobVertex().isAnyOutputBlocking());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to