This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fcacc42e17f00cb47c5c16fe75af035f784ae1fa Author: Yun Tang <[email protected]> AuthorDate: Mon May 11 13:49:18 2020 +0800 [FLINK-8871][checkpoint] Support to cancel checkpoing via notification on checkpoint coordinator side --- .../runtime/checkpoint/CheckpointCoordinator.java | 16 +++++++++++++++ .../checkpoint/CheckpointFailureManager.java | 1 + .../checkpoint/CheckpointFailureReason.java | 2 ++ .../flink/runtime/executiongraph/Execution.java | 19 ++++++++++++++++++ .../jobmanager/slots/TaskManagerGateway.java | 14 +++++++++++++ .../runtime/jobmaster/RpcTaskManagerGateway.java | 5 +++++ .../flink/runtime/taskexecutor/TaskExecutor.java | 23 +++++++++++++++++++++- .../runtime/taskexecutor/TaskExecutorGateway.java | 11 +++++++++++ .../checkpoint/CheckpointCoordinatorTest.java | 7 +++++++ .../utils/SimpleAckingTaskManagerGateway.java | 7 +++++++ .../taskexecutor/TestingTaskExecutorGateway.java | 5 +++++ 11 files changed, 109 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 9e2a4a7..7689f29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -113,6 +113,7 @@ public class CheckpointCoordinator { private final ExecutionVertex[] tasksToWaitFor; /** Tasks who need to be sent a message when a checkpoint is confirmed. */ + // TODO currently we use commit vertices to receive "abort checkpoint" messages. private final ExecutionVertex[] tasksToCommitTo; /** The operator coordinators that need to be checkpointed. */ @@ -1015,6 +1016,7 @@ public class CheckpointCoordinator { } }); + sendAbortedMessages(checkpointId, pendingCheckpoint.getCheckpointTimestamp()); throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception); } @@ -1067,6 +1069,19 @@ public class CheckpointCoordinator { } } + private void sendAbortedMessages(long checkpointId, long timeStamp) { + // send notification of aborted checkpoints asynchronously. + executor.execute(() -> { + // send the "abort checkpoint" messages to necessary vertices. + for (ExecutionVertex ev : tasksToCommitTo) { + Execution ee = ev.getCurrentExecutionAttempt(); + if (ee != null) { + ee.notifyCheckpointAborted(checkpointId, timeStamp); + } + } + }); + } + /** * Fails all pending checkpoints which have not been acknowledged by the given execution * attempt id. @@ -1576,6 +1591,7 @@ public class CheckpointCoordinator { exception, pendingCheckpoint.getCheckpointId()); } } finally { + sendAbortedMessages(pendingCheckpoint.getCheckpointId(), pendingCheckpoint.getCheckpointTimestamp()); pendingCheckpoints.remove(pendingCheckpoint.getCheckpointId()); rememberRecentCheckpointId(pendingCheckpoint.getCheckpointId()); timer.execute(this::executeQueuedRequest); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 9e162e7..0dc655b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -120,6 +120,7 @@ public class CheckpointFailureManager { case CHECKPOINT_EXPIRED: case TASK_FAILURE: case TASK_CHECKPOINT_FAILURE: + case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE: case TRIGGER_CHECKPOINT_FAILURE: case FINALIZE_CHECKPOINT_FAILURE: //ignore diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java index 023f9bf..cd787d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java @@ -68,6 +68,8 @@ public enum CheckpointFailureReason { TASK_CHECKPOINT_FAILURE(false, "Task local checkpoint failure."), + UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE(false, "Unknown task for the checkpoint to notify."), + FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."), TRIGGER_CHECKPOINT_FAILURE(false, "Trigger checkpoint failure."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 7d60a6c..0102ce7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -994,6 +994,25 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } /** + * Notify the task of this execution about a aborted checkpoint. + * + * @param abortCheckpointId of the subsumed checkpoint + * @param timestamp of the subsumed checkpoint + */ + public void notifyCheckpointAborted(long abortCheckpointId, long timestamp) { + final LogicalSlot slot = assignedResource; + + if (slot != null) { + final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); + + taskManagerGateway.notifyCheckpointAborted(attemptId, getVertex().getJobId(), abortCheckpointId, timestamp); + } else { + LOG.debug("The execution has no slot assigned. This indicates that the execution is " + + "no longer running."); + } + } + + /** * Trigger a new checkpoint on the task of this execution. * * @param checkpointId of th checkpoint to trigger diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java index da21982..e7cbeae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java @@ -120,6 +120,20 @@ public interface TaskManagerGateway extends TaskExecutorOperatorEventGateway { long timestamp); /** + * Notify the given task about a aborted checkpoint. + * + * @param executionAttemptID identifying the task + * @param jobId identifying the job to which the task belongs + * @param checkpointId of the subsumed checkpoint + * @param timestamp of the subsumed checkpoint + */ + void notifyCheckpointAborted( + ExecutionAttemptID executionAttemptID, + JobID jobId, + long checkpointId, + long timestamp); + + /** * Trigger for the given task a checkpoint. * * @param executionAttemptID identifying the task diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java index 9aec97e..2a20b71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java @@ -93,6 +93,11 @@ public class RpcTaskManagerGateway implements TaskManagerGateway { } @Override + public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) { + taskExecutorGateway.abortCheckpoint(executionAttemptID, checkpointId, timestamp); + } + + @Override public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { taskExecutorGateway.triggerCheckpoint( executionAttemptID, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index d74532f..f8438f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -860,7 +860,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.'; log.debug(message); - return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE)); + return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)); + } + } + + @Override + public CompletableFuture<Acknowledge> abortCheckpoint( + ExecutionAttemptID executionAttemptID, + long checkpointId, + long checkpointTimestamp) { + log.debug("Abort checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); + + final Task task = taskSlotTable.getTask(executionAttemptID); + + if (task != null) { + task.notifyCheckpointAborted(checkpointId); + + return CompletableFuture.completedFuture(Acknowledge.get()); + } else { + final String message = "TaskManager received an aborted checkpoint for unknown task " + executionAttemptID + '.'; + + log.debug(message); + return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index ad37db0..6867451 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -156,6 +156,17 @@ public interface TaskExecutorGateway extends RpcGateway, TaskExecutorOperatorEve CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp); /** + * Abort a checkpoint for the given task. The checkpoint is identified by the checkpoint ID + * and the checkpoint timestamp. + * + * @param executionAttemptID identifying the task + * @param checkpointId unique id for the checkpoint + * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated + * @return Future acknowledge if the checkpoint has been successfully confirmed + */ + CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp); + + /** * Cancel the given task. * * @param executionAttemptID identifying the task diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index ac48106..e799836 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -425,6 +425,9 @@ public class CheckpointCoordinatorTest extends TestLogger { // decline checkpoint from one of the tasks, this should cancel the checkpoint coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO); + verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class)); + assertTrue(checkpoint1.isDiscarded()); // validate that we have only one pending checkpoint left @@ -453,6 +456,10 @@ public class CheckpointCoordinatorTest extends TestLogger { coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO); assertTrue(checkpoint1.isDiscarded()); + // will not notify abort message again + verify(vertex1.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class)); + verify(vertex2.getCurrentExecutionAttempt(), times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class)); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index 7e6ee2d..59aee81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -129,6 +129,13 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { long timestamp) {} @Override + public void notifyCheckpointAborted( + ExecutionAttemptID executionAttemptID, + JobID jobId, + long checkpointId, + long timestamp) {} + + @Override public void triggerCheckpoint( ExecutionAttemptID executionAttemptID, JobID jobId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index f7e82d2..2a039c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -169,6 +169,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { } @Override + public CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { return cancelTaskFunction.apply(executionAttemptID); }
