[FLINK-9873][runtime] Log task state when aborting checkpoint This closes #6350.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa770ba6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa770ba6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa770ba6 Branch: refs/heads/release-1.5 Commit: aa770ba6f325b85c7242e535d45a6080d2703232 Parents: 9baca1b Author: zentol <ches...@apache.org> Authored: Tue Jul 17 09:34:45 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Mon Jul 23 09:15:21 2018 +0200 ---------------------------------------------------------------------- .../runtime/checkpoint/CheckpointCoordinator.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aa770ba6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 55e1ffe..82227cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -456,13 +456,20 @@ public class CheckpointCoordinator { Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); - if (ee != null && ee.getState() == ExecutionState.RUNNING) { - executions[i] = ee; - } else { + if (ee == null) { LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); + } else if (ee.getState() == ExecutionState.RUNNING) { + executions[i] = ee; + } else { + LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", + tasksToTrigger[i].getTaskNameWithSubtaskIndex(), + job, + ExecutionState.RUNNING, + ee.getState()); + return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } }