[FLINK-9873][runtime] Log task state when aborting checkpoint

This closes #6350.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87cb185b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87cb185b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87cb185b

Branch: refs/heads/release-1.6
Commit: 87cb185b5d270f1cc6f05756fb25d00c5eaf5ff7
Parents: 29c76ce
Author: zentol <ches...@apache.org>
Authored: Tue Jul 17 09:34:45 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Mon Jul 23 09:14:50 2018 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointCoordinator.java      | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87cb185b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 803b2ca..e936b24 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -457,13 +457,20 @@ public class CheckpointCoordinator {
                Execution[] executions = new Execution[tasksToTrigger.length];
                for (int i = 0; i < tasksToTrigger.length; i++) {
                        Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-                       if (ee != null && ee.getState() == 
ExecutionState.RUNNING) {
-                               executions[i] = ee;
-                       } else {
+                       if (ee == null) {
                                LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
                                                
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                                                job);
                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+                       } else if (ee.getState() == ExecutionState.RUNNING) {
+                               executions[i] = ee;
+                       } else {
+                               LOG.info("Checkpoint triggering task {} of job 
{} is not in state {} but {} instead. Aborting checkpoint.",
+                                               
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+                                               job,
+                                               ExecutionState.RUNNING,
+                                               ee.getState());
+                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                        }
                }
 

Reply via email to