This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fcacc42e17f00cb47c5c16fe75af035f784ae1fa
Author: Yun Tang <[email protected]>
AuthorDate: Mon May 11 13:49:18 2020 +0800

    [FLINK-8871][checkpoint] Support to cancel checkpoing via notification on 
checkpoint coordinator side
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 16 +++++++++++++++
 .../checkpoint/CheckpointFailureManager.java       |  1 +
 .../checkpoint/CheckpointFailureReason.java        |  2 ++
 .../flink/runtime/executiongraph/Execution.java    | 19 ++++++++++++++++++
 .../jobmanager/slots/TaskManagerGateway.java       | 14 +++++++++++++
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  5 +++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 23 +++++++++++++++++++++-
 .../runtime/taskexecutor/TaskExecutorGateway.java  | 11 +++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java      |  7 +++++++
 .../utils/SimpleAckingTaskManagerGateway.java      |  7 +++++++
 .../taskexecutor/TestingTaskExecutorGateway.java   |  5 +++++
 11 files changed, 109 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9e2a4a7..7689f29 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -113,6 +113,7 @@ public class CheckpointCoordinator {
        private final ExecutionVertex[] tasksToWaitFor;
 
        /** Tasks who need to be sent a message when a checkpoint is confirmed. 
*/
+       // TODO currently we use commit vertices to receive "abort checkpoint" 
messages.
        private final ExecutionVertex[] tasksToCommitTo;
 
        /** The operator coordinators that need to be checkpointed. */
@@ -1015,6 +1016,7 @@ public class CheckpointCoordinator {
                                        }
                                });
 
+                               sendAbortedMessages(checkpointId, 
pendingCheckpoint.getCheckpointTimestamp());
                                throw new CheckpointException("Could not 
complete the pending checkpoint " + checkpointId + '.',
                                        
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
                        }
@@ -1067,6 +1069,19 @@ public class CheckpointCoordinator {
                }
        }
 
+       private void sendAbortedMessages(long checkpointId, long timeStamp) {
+               // send notification of aborted checkpoints asynchronously.
+               executor.execute(() -> {
+                       // send the "abort checkpoint" messages to necessary 
vertices.
+                       for (ExecutionVertex ev : tasksToCommitTo) {
+                               Execution ee = ev.getCurrentExecutionAttempt();
+                               if (ee != null) {
+                                       
ee.notifyCheckpointAborted(checkpointId, timeStamp);
+                               }
+                       }
+               });
+       }
+
        /**
         * Fails all pending checkpoints which have not been acknowledged by 
the given execution
         * attempt id.
@@ -1576,6 +1591,7 @@ public class CheckpointCoordinator {
                                                exception, 
pendingCheckpoint.getCheckpointId());
                                }
                        } finally {
+                               
sendAbortedMessages(pendingCheckpoint.getCheckpointId(), 
pendingCheckpoint.getCheckpointTimestamp());
                                
pendingCheckpoints.remove(pendingCheckpoint.getCheckpointId());
                                
rememberRecentCheckpointId(pendingCheckpoint.getCheckpointId());
                                timer.execute(this::executeQueuedRequest);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index 9e162e7..0dc655b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -120,6 +120,7 @@ public class CheckpointFailureManager {
                        case CHECKPOINT_EXPIRED:
                        case TASK_FAILURE:
                        case TASK_CHECKPOINT_FAILURE:
+                       case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
                        case TRIGGER_CHECKPOINT_FAILURE:
                        case FINALIZE_CHECKPOINT_FAILURE:
                                //ignore
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 023f9bf..cd787d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -68,6 +68,8 @@ public enum CheckpointFailureReason {
 
        TASK_CHECKPOINT_FAILURE(false, "Task local checkpoint failure."),
 
+       UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE(false, "Unknown task for 
the checkpoint to notify."),
+
        FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."),
 
        TRIGGER_CHECKPOINT_FAILURE(false, "Trigger checkpoint failure.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 7d60a6c..0102ce7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -994,6 +994,25 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        /**
+        * Notify the task of this execution about a aborted checkpoint.
+        *
+        * @param abortCheckpointId of the subsumed checkpoint
+        * @param timestamp of the subsumed checkpoint
+        */
+       public void notifyCheckpointAborted(long abortCheckpointId, long 
timestamp) {
+               final LogicalSlot slot = assignedResource;
+
+               if (slot != null) {
+                       final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
+
+                       taskManagerGateway.notifyCheckpointAborted(attemptId, 
getVertex().getJobId(), abortCheckpointId, timestamp);
+               } else {
+                       LOG.debug("The execution has no slot assigned. This 
indicates that the execution is " +
+                               "no longer running.");
+               }
+       }
+
+       /**
         * Trigger a new checkpoint on the task of this execution.
         *
         * @param checkpointId of th checkpoint to trigger
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index da21982..e7cbeae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -120,6 +120,20 @@ public interface TaskManagerGateway extends 
TaskExecutorOperatorEventGateway {
                long timestamp);
 
        /**
+        * Notify the given task about a aborted checkpoint.
+        *
+        * @param executionAttemptID identifying the task
+        * @param jobId identifying the job to which the task belongs
+        * @param checkpointId of the subsumed checkpoint
+        * @param timestamp of the subsumed checkpoint
+        */
+       void notifyCheckpointAborted(
+               ExecutionAttemptID executionAttemptID,
+               JobID jobId,
+               long checkpointId,
+               long timestamp);
+
+       /**
         * Trigger for the given task a checkpoint.
         *
         * @param executionAttemptID identifying the task
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 9aec97e..2a20b71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -93,6 +93,11 @@ public class RpcTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
+       public void notifyCheckpointAborted(ExecutionAttemptID 
executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+               taskExecutorGateway.abortCheckpoint(executionAttemptID, 
checkpointId, timestamp);
+       }
+
+       @Override
        public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, 
JobID jobId, long checkpointId, long timestamp, CheckpointOptions 
checkpointOptions, boolean advanceToEndOfEventTime) {
                taskExecutorGateway.triggerCheckpoint(
                        executionAttemptID,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d74532f..f8438f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -860,7 +860,28 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        final String message = "TaskManager received a 
checkpoint confirmation for unknown task " + executionAttemptID + '.';
 
                        log.debug(message);
-                       return FutureUtils.completedExceptionally(new 
CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
+                       return FutureUtils.completedExceptionally(new 
CheckpointException(message, 
CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
+               }
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> abortCheckpoint(
+                       ExecutionAttemptID executionAttemptID,
+                       long checkpointId,
+                       long checkpointTimestamp) {
+               log.debug("Abort checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
+
+               final Task task = taskSlotTable.getTask(executionAttemptID);
+
+               if (task != null) {
+                       task.notifyCheckpointAborted(checkpointId);
+
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               } else {
+                       final String message = "TaskManager received an aborted 
checkpoint for unknown task " + executionAttemptID + '.';
+
+                       log.debug(message);
+                       return FutureUtils.completedExceptionally(new 
CheckpointException(message, 
CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ad37db0..6867451 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -156,6 +156,17 @@ public interface TaskExecutorGateway extends RpcGateway, 
TaskExecutorOperatorEve
        CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp);
 
        /**
+        * Abort a checkpoint for the given task. The checkpoint is identified 
by the checkpoint ID
+        * and the checkpoint timestamp.
+        *
+        * @param executionAttemptID identifying the task
+        * @param checkpointId unique id for the checkpoint
+        * @param checkpointTimestamp is the timestamp when the checkpoint has 
been initiated
+        * @return Future acknowledge if the checkpoint has been successfully 
confirmed
+        */
+       CompletableFuture<Acknowledge> abortCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp);
+
+       /**
         * Cancel the given task.
         *
         * @param executionAttemptID identifying the task
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ac48106..e799836 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -425,6 +425,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
                        // decline checkpoint from one of the tasks, this 
should cancel the checkpoint
                        coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
+                       verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+                       verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+
                        assertTrue(checkpoint1.isDiscarded());
 
                        // validate that we have only one pending checkpoint 
left
@@ -453,6 +456,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
                        coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id), TASK_MANAGER_LOCATION_INFO);
                        assertTrue(checkpoint1.isDiscarded());
 
+                       // will not notify abort message again
+                       verify(vertex1.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+                       verify(vertex2.getCurrentExecutionAttempt(), 
times(1)).notifyCheckpointAborted(eq(checkpoint1Id), any(Long.class));
+
                        coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 7e6ee2d..59aee81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -129,6 +129,13 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
                        long timestamp) {}
 
        @Override
+       public void notifyCheckpointAborted(
+                       ExecutionAttemptID executionAttemptID,
+                       JobID jobId,
+                       long checkpointId,
+                       long timestamp) {}
+
+       @Override
        public void triggerCheckpoint(
                        ExecutionAttemptID executionAttemptID,
                        JobID jobId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index f7e82d2..2a039c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -169,6 +169,11 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
        }
 
        @Override
+       public CompletableFuture<Acknowledge> 
abortCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long 
checkpointTimestamp) {
+               return CompletableFuture.completedFuture(Acknowledge.get());
+       }
+
+       @Override
        public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
                return cancelTaskFunction.apply(executionAttemptID);
        }

Reply via email to