[FLINK-9873][runtime] Log task state when aborting checkpoint This closes #6350.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87cb185b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87cb185b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87cb185b Branch: refs/heads/release-1.6 Commit: 87cb185b5d270f1cc6f05756fb25d00c5eaf5ff7 Parents: 29c76ce Author: zentol <ches...@apache.org> Authored: Tue Jul 17 09:34:45 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Mon Jul 23 09:14:50 2018 +0200 ---------------------------------------------------------------------- .../runtime/checkpoint/CheckpointCoordinator.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/87cb185b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 803b2ca..e936b24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -457,13 +457,20 @@ public class CheckpointCoordinator { Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); - if (ee != null && ee.getState() == ExecutionState.RUNNING) { - executions[i] = ee; - } else { + if (ee == null) { LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); + } else if (ee.getState() == ExecutionState.RUNNING) { + executions[i] = ee; + } else { + LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", + tasksToTrigger[i].getTaskNameWithSubtaskIndex(), + job, + ExecutionState.RUNNING, + ee.getState()); + return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } }