mohitjain2504 commented on code in PR #24272:
URL: https://github.com/apache/flink/pull/24272#discussion_r1479254174
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java:
##########
@@ -31,19 +35,31 @@
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
private final CheckpointCoordinator coordinator;
+ private final Map<JobVertexID, ExecutionJobVertex> tasks;
- public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator)
{
+ public CheckpointCoordinatorDeActivator(
+ CheckpointCoordinator coordinator, Map<JobVertexID,
ExecutionJobVertex> tasks) {
this.coordinator = checkNotNull(coordinator);
+ this.tasks = checkNotNull(tasks);
}
@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long
timestamp) {
- if (newJobStatus == JobStatus.RUNNING) {
- // start the checkpoint scheduler
+ if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking()) {
+ // start the checkpoint scheduler if there is no blocking edge
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
+
+ private boolean allTasksOutputNonBlocking() {
+ for (ExecutionJobVertex vertex : tasks.values()) {
Review Comment:
can also write it in this manner
```
return tasks.values().stream()
.noneMatch(vertex ->
vertex.getJobVertex().isAnyOutputBlocking());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]