Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1537#discussion_r50669903
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -505,6 +506,88 @@ else if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
        }
     
        /**
    +    * Receives a {@link DeclineCheckpoint} message and returns whether the
    +    * message was associated with a pending checkpoint.
    +    *
    +    * @param message Checkpoint decline from the task manager
    +    *
    +    * @return Flag indicating whether the declined checkpoint was 
associated
    +    * with a pending checkpoint.
    +    */
    +
    +   public boolean receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
    +           if (shutdown || message == null) {
    +                   return false;
    +           }
    +           if (!job.equals(message.getJob())) {
    +                   LOG.error("Received DeclineCheckpoint message for wrong 
job: {}", message);
    +                   return false;
    +           }
    +
    +           final long checkpointId = message.getCheckpointId();
    +
    +           CompletedCheckpoint completed = null;
    +           PendingCheckpoint checkpoint;
    +
    +           // Flag indicating whether the ack message was for a known 
pending
    +           // checkpoint.
    +           boolean isPendingCheckpoint;
    +
    +           synchronized (lock) {
    +                   // we need to check inside the lock for being shutdown 
as well, otherwise we
    +                   // get races and invalid error log messages
    +                   if (shutdown) {
    +                           return false;
    +                   }
    +
    +                   checkpoint = pendingCheckpoints.get(checkpointId);
    +
    +                   if (checkpoint != null && !checkpoint.isDiscarded()) {
    +                           isPendingCheckpoint = true;
    +
    +                           LOG.info("Discarding checkpoint " + checkpointId
    +                                   + " because of checkpoint decline from 
task " + message.getTaskExecutionId());
    +
    +                           pendingCheckpoints.remove(checkpointId);
    +                           checkpoint.discard(userClassLoader);
    +                           rememberRecentCheckpointId(checkpointId);
    +
    +                           boolean haveMoreRecentPending = false;
    +                           Iterator<Map.Entry<Long, PendingCheckpoint>> 
entries = pendingCheckpoints.entrySet().iterator();
    +                           while (entries.hasNext()) {
    +                                   PendingCheckpoint p = 
entries.next().getValue();
    +                                   if (!p.isDiscarded() && 
p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
    +                                           haveMoreRecentPending = true;
    +                                           break;
    +                                   }
    +                           }
    +                           if (!haveMoreRecentPending && 
!triggerRequestQueued) {
    +                                   LOG.info("Triggering new checkpoint 
because of discarded checkpoint " + checkpointId);
    +                                   
triggerCheckpoint(System.currentTimeMillis());
    +                           } else if (!haveMoreRecentPending) {
    +                                   LOG.info("Promoting queued checkpoint 
request because of discarded checkpoint " + checkpointId);
    +                                   triggerQueuedRequests();
    +                           }
    +
    --- End diff --
    
    empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to