[ 
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708914#comment-15708914
 ] 

ASF GitHub Bot commented on FLINK-5158:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2873#discussion_r90257889
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -651,64 +651,33 @@ public boolean 
receiveDeclineMessage(DeclineCheckpoint message) {
         *
         * @throws Exception If the checkpoint cannot be added to the completed 
checkpoint store.
         */
    -   public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws Exception {
    +   public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws CheckpointException {
                if (shutdown || message == null) {
                        return false;
                }
                if (!job.equals(message.getJob())) {
    -                   LOG.error("Received AcknowledgeCheckpoint message for 
wrong job: {}", message);
    +                   LOG.error("Received wrong AcknowledgeCheckpoint message 
for job {}: {}", job, message);
                        return false;
                }
     
                final long checkpointId = message.getCheckpointId();
     
    -           CompletedCheckpoint completed = null;
    -           PendingCheckpoint checkpoint;
    -
    -           // Flag indicating whether the ack message was for a known 
pending
    -           // checkpoint.
    -           boolean isPendingCheckpoint;
    -
                synchronized (lock) {
                        // we need to check inside the lock for being shutdown 
as well, otherwise we
                        // get races and invalid error log messages
                        if (shutdown) {
                                return false;
                        }
     
    -                   checkpoint = pendingCheckpoints.get(checkpointId);
    +                   final PendingCheckpoint checkpoint = 
pendingCheckpoints.get(checkpointId);
     
                        if (checkpoint != null && !checkpoint.isDiscarded()) {
    -                           isPendingCheckpoint = true;
     
                                switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), 
message.getStateSize(), null)) {
                                        case SUCCESS:
                                                // TODO: Give KV-state to the 
acknowledgeTask method
    --- End diff --
    
    Unrelated, but could you remove this TODO since this has been addressed for 
1.2, but probably won't be addressed in 1.1


> Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-5158
>                 URL: https://issues.apache.org/jira/browse/FLINK-5158
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator does not properly handle exceptions when trying to 
> store completed checkpoints. As a result, completed checkpoints are not 
> properly cleaned up and even worse, the {{CheckpointCoordinator}} might get 
> stuck stopping triggering checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to