[ 
https://issues.apache.org/jira/browse/FLINK-3261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114940#comment-15114940
 ] 

ASF GitHub Bot commented on FLINK-3261:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1537#discussion_r50669903
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -505,6 +506,88 @@ else if (pendingCheckpoints.size() >= 
maxConcurrentCheckpointAttempts) {
        }
     
        /**
    +    * Receives a {@link DeclineCheckpoint} message and returns whether the
    +    * message was associated with a pending checkpoint.
    +    *
    +    * @param message Checkpoint decline from the task manager
    +    *
    +    * @return Flag indicating whether the declined checkpoint was 
associated
    +    * with a pending checkpoint.
    +    */
    +
    +   public boolean receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
    +           if (shutdown || message == null) {
    +                   return false;
    +           }
    +           if (!job.equals(message.getJob())) {
    +                   LOG.error("Received DeclineCheckpoint message for wrong 
job: {}", message);
    +                   return false;
    +           }
    +
    +           final long checkpointId = message.getCheckpointId();
    +
    +           CompletedCheckpoint completed = null;
    +           PendingCheckpoint checkpoint;
    +
    +           // Flag indicating whether the ack message was for a known 
pending
    +           // checkpoint.
    +           boolean isPendingCheckpoint;
    +
    +           synchronized (lock) {
    +                   // we need to check inside the lock for being shutdown 
as well, otherwise we
    +                   // get races and invalid error log messages
    +                   if (shutdown) {
    +                           return false;
    +                   }
    +
    +                   checkpoint = pendingCheckpoints.get(checkpointId);
    +
    +                   if (checkpoint != null && !checkpoint.isDiscarded()) {
    +                           isPendingCheckpoint = true;
    +
    +                           LOG.info("Discarding checkpoint " + checkpointId
    +                                   + " because of checkpoint decline from 
task " + message.getTaskExecutionId());
    +
    +                           pendingCheckpoints.remove(checkpointId);
    +                           checkpoint.discard(userClassLoader);
    +                           rememberRecentCheckpointId(checkpointId);
    +
    +                           boolean haveMoreRecentPending = false;
    +                           Iterator<Map.Entry<Long, PendingCheckpoint>> 
entries = pendingCheckpoints.entrySet().iterator();
    +                           while (entries.hasNext()) {
    +                                   PendingCheckpoint p = 
entries.next().getValue();
    +                                   if (!p.isDiscarded() && 
p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
    +                                           haveMoreRecentPending = true;
    +                                           break;
    +                                   }
    +                           }
    +                           if (!haveMoreRecentPending && 
!triggerRequestQueued) {
    +                                   LOG.info("Triggering new checkpoint 
because of discarded checkpoint " + checkpointId);
    +                                   
triggerCheckpoint(System.currentTimeMillis());
    +                           } else if (!haveMoreRecentPending) {
    +                                   LOG.info("Promoting queued checkpoint 
request because of discarded checkpoint " + checkpointId);
    +                                   triggerQueuedRequests();
    +                           }
    +
    --- End diff --
    
    empty line


> Tasks should eagerly report back when they cannot start a checkpoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-3261
>                 URL: https://issues.apache.org/jira/browse/FLINK-3261
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Runtime
>    Affects Versions: 0.10.1
>            Reporter: Stephan Ewen
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> With very fast checkpoint intervals (few 100 msecs), it can happen that a 
> Task is not ready to start a checkpoint by the time it gets the first 
> checkpoint trigger message.
> If some other tasks are ready already and commence a checkpoint, the stream 
> alignment will make the non-participating task wait until the checkpoint 
> expires (default: 10 minutes).
> A simple way to fix this is that tasks report back when they could not start 
> a checkpoint. The checkpoint coordinator can then abort that checkpoint and 
> unblock the streams by starting new checkpoint (where all tasks will 
> participate).
> An optimization would be to send a special "abort checkpoints barrier" that 
> tells the barrier buffers for stream alignment to unblock a checkpoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to