[ 
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648108#comment-15648108
 ] 

ASF GitHub Bot commented on FLINK-4975:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87034115
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) {
        }
     
        /**
    -    * Receives a {@link DeclineCheckpoint} message and returns whether the
    -    * message was associated with a pending checkpoint.
    +    * Receives a {@link DeclineCheckpoint} message for a pending 
checkpoint.
         *
         * @param message Checkpoint decline from the task manager
    -    *
    -    * @return Flag indicating whether the declined checkpoint was 
associated
    -    * with a pending checkpoint.
         */
    -   public boolean receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
    +   public void receiveDeclineMessage(DeclineCheckpoint message) throws 
Exception {
                if (shutdown || message == null) {
    -                   return false;
    +                   return;
                }
                if (!job.equals(message.getJob())) {
    -                   LOG.error("Received DeclineCheckpoint message for wrong 
job: {}", message);
    -                   return false;
    +                   throw new IllegalArgumentException("Received 
DeclineCheckpoint message for job " +
    +                           message.getJob() + " while this coordinator 
handles job " + job);
                }
     
                final long checkpointId = message.getCheckpointId();
    +           final String reason = (message.getReason() != null ? 
message.getReason().getMessage() : "");
     
                PendingCheckpoint checkpoint;
     
    -           // Flag indicating whether the ack message was for a known 
pending
    -           // checkpoint.
    -           boolean isPendingCheckpoint;
    -
                synchronized (lock) {
                        // we need to check inside the lock for being shutdown 
as well, otherwise we
                        // get races and invalid error log messages
                        if (shutdown) {
    -                           return false;
    +                           return;
                        }
     
                        checkpoint = pendingCheckpoints.get(checkpointId);
     
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
    -                           isPendingCheckpoint = true;
    -
    -                           LOG.info("Discarding checkpoint " + checkpointId
    -                                           + " because of checkpoint 
decline from task " + message.getTaskExecutionId());
    +                           LOG.info("Discarding checkpoint " + 
checkpointId + " because of checkpoint decline from task " + 
    --- End diff --
    
    Parameterized logging statement using `{}` would be better.


> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
>                 Key: FLINK-4975
>                 URL: https://issues.apache.org/jira/browse/FLINK-4975
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After 
> exceeding that limit, the checkpoint alignment should abort and the 
> checkpoint be canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to