pnowojski commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r363080239
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##########
 @@ -1352,52 +1359,6 @@ public void run() {
                }
        }
 
-       /**
-        * Discards the given pending checkpoint because of the given cause.
-        *
-        * @param pendingCheckpoint to discard
-        * @param cause for discarding the checkpoint
-        * @param executionAttemptID the execution attempt id of the failing 
task.
-        */
-       private void discardCheckpoint(
-               PendingCheckpoint pendingCheckpoint,
-               @Nullable Throwable cause,
-               ExecutionAttemptID executionAttemptID) {
-               assert(Thread.holdsLock(lock));
-               Preconditions.checkNotNull(pendingCheckpoint);
-
-               final long checkpointId = pendingCheckpoint.getCheckpointId();
-
-               LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
-
-               if (cause == null) {
-                       
failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID);
-               } else if (cause instanceof CheckpointException) {
-                       CheckpointException exception = (CheckpointException) 
cause;
-                       
failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
exception.getCheckpointFailureReason(), cause, executionAttemptID);
-               } else {
-                       
failPendingCheckpointDueToTaskFailure(pendingCheckpoint, 
CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID);
-               }
-
-               rememberRecentCheckpointId(checkpointId);
-
-               // we don't have to schedule another "dissolving" checkpoint 
any more because the
-               // cancellation barriers take care of breaking downstream 
alignments
-               // we only need to make sure that suspended queued requests are 
resumed
-
-               boolean haveMoreRecentPending = false;
-               for (PendingCheckpoint p : pendingCheckpoints.values()) {
-                       if (!p.isDiscarded() && p.getCheckpointId() >= 
pendingCheckpoint.getCheckpointId()) {
-                               haveMoreRecentPending = true;
-                               break;
-                       }
-               }
-
-               if (!haveMoreRecentPending) {
-                       triggerQueuedRequests();
-               }
 
 Review comment:
   This if check is gone now? Now this is called always?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to