rkhachatryan commented on a change in pull request #15466:
URL: https://github.com/apache/flink/pull/15466#discussion_r607699550



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java
##########
@@ -147,11 +147,7 @@ private void checkAllTasksInitiated() throws 
CheckpointException {
      */
     private void checkTasksStarted(List<Execution> toTrigger) throws 
CheckpointException {
         for (Execution execution : toTrigger) {
-            if (execution.getState() == ExecutionState.CREATED
-                    || execution.getState() == ExecutionState.RECOVERING
-                    || execution.getState() == ExecutionState.SCHEDULED
-                    || execution.getState() == ExecutionState.DEPLOYING) {
-
+            if (execution.getState() != ExecutionState.RUNNING) {

Review comment:
       I guess you are referring to this part:
   > Although we would also trigger checkpoint after tasks finished, but the 
finished tasks should not be in the list of tasks to trigger
   
   As I understand it, the following checks is in 
`DefaultCheckpointPlanCalculator` **should not** be changed:
   ```
   if (context.hasFinishedTasks() && !allowCheckpointsAfterTasksFinished) { 
throw exception }
   checkAllTasksInitiated();
   ```
   
   But his one **should**:
   ```
   checkTasksStarted(result.getTasksToTrigger());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to