pnowojski commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363288346
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##########
 @@ -1429,61 +1390,45 @@ public void run() {
                }
        }
 
-       private void failPendingCheckpoint(
-                       final PendingCheckpoint pendingCheckpoint,
-                       final CheckpointFailureReason reason) {
-
-               failPendingCheckpoint(pendingCheckpoint, reason, null);
-       }
-
-       private void failPendingCheckpoint(
-               final PendingCheckpoint pendingCheckpoint,
-               final CheckpointFailureReason reason,
-               @Nullable final Throwable cause) {
-
-               CheckpointException exception = new CheckpointException(reason, 
cause);
-               pendingCheckpoint.abort(reason, cause);
-               failureManager.handleJobLevelCheckpointException(exception, 
pendingCheckpoint.getCheckpointId());
-
-               checkAndResetCheckpointScheduler();
-       }
-
-       private void failPendingCheckpointDueToTaskFailure(
-               final PendingCheckpoint pendingCheckpoint,
-               final CheckpointFailureReason reason,
-               final ExecutionAttemptID executionAttemptID) {
+       private void abortPendingCheckpoint(
+               PendingCheckpoint pendingCheckpoint,
+               CheckpointException exception) {
 
-               failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
reason, null, executionAttemptID);
+               abortPendingCheckpoint(pendingCheckpoint, exception, null);
        }
 
-       private void failPendingCheckpointDueToTaskFailure(
-                       final PendingCheckpoint pendingCheckpoint,
-                       final CheckpointFailureReason reason,
-                       @Nullable final Throwable cause,
-                       final ExecutionAttemptID executionAttemptID) {
-
-               CheckpointException exception = new CheckpointException(reason, 
cause);
-               pendingCheckpoint.abort(reason, cause);
-               failureManager.handleTaskLevelCheckpointException(exception, 
pendingCheckpoint.getCheckpointId(), executionAttemptID);
+       private void abortPendingCheckpoint(
+               PendingCheckpoint pendingCheckpoint,
+               CheckpointException exception,
+               @Nullable final ExecutionAttemptID executionAttemptID) {
 
-               checkAndResetCheckpointScheduler();
-       }
+               assert(Thread.holdsLock(lock));
 
-       private void checkAndResetCheckpointScheduler() {
-               if (!shutdown && periodicScheduling && currentPeriodicTrigger 
== null) {
-                       synchronized (lock) {
-                               if (pendingCheckpoints.isEmpty() || 
allPendingCheckpointsDiscarded()) {
-                                       triggerRequestQueued = false;
-                                       currentPeriodicTrigger = 
scheduleTriggerWithDelay(getRandomInitDelay());
 
 Review comment:
   ok, it makes sense to me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to