[ 
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708911#comment-15708911
 ] 

ASF GitHub Bot commented on FLINK-5158:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2873#discussion_r90258796
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -731,46 +700,100 @@ public boolean 
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
     
                                                
discardState(message.getState());
                                }
    +
    +                           return true;
                        }
                        else if (checkpoint != null) {
                                // this should not happen
                                throw new IllegalStateException(
                                                "Received message for discarded 
but non-removed checkpoint " + checkpointId);
                        }
                        else {
    +                           boolean wasPendingCheckpoint;
    +
                                // message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
                                if 
(recentPendingCheckpoints.contains(checkpointId)) {
    -                                   isPendingCheckpoint = true;
    +                                   wasPendingCheckpoint = true;
                                        LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
                                }
                                else {
                                        LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
    -                                   isPendingCheckpoint = false;
    +                                   wasPendingCheckpoint = false;
                                }
     
                                // try to discard the state so that we don't 
have lingering state lying around
                                discardState(message.getState());
    +
    +                           return wasPendingCheckpoint;
    +                   }
    +           }
    +   }
    +
    +   private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
    +           final long checkpointId = pendingCheckpoint.getCheckpointId();
    +           CompletedCheckpoint completedCheckpoint = null;
    +
    +           try {
    +                   completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
    +
    +                   
completedCheckpointStore.addCheckpoint(completedCheckpoint);
    +
    +                   rememberRecentCheckpointId(checkpointId);
    +                   
dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
    +
    +                   onFullyAcknowledgedCheckpoint(completedCheckpoint);
    +           } catch (Exception exception) {
    +                   // abort the current pending checkpoint if it has not 
been discarded yet
    +                   if(!pendingCheckpoint.isDiscarded()) {
    +                           pendingCheckpoint.discard(userClassLoader);
    +                   }
    +
    +                   if (completedCheckpoint != null) {
    +                           // we failed to store the completed checkpoint. 
Let's clean up
    +                           final CompletedCheckpoint cc = 
completedCheckpoint;
    +
    +                           executor.execute(new Runnable() {
    +                                   @Override
    +                                   public void run() {
    +                                           try {
    +                                                   
cc.discard(userClassLoader);
    +                                           } catch (Exception 
nestedException) {
    +                                                   LOG.warn("Could not 
properly discard completed checkpoint {}.", cc.getCheckpointID(), 
nestedException);
    +                                           }
    +                                   }
    +                           });
                        }
    +
    +                   throw new CheckpointException("Could not complete the 
pending checkpoint " + checkpointId + '.', exception);
    +           } finally {
    +                   pendingCheckpoints.remove(checkpointId);
    +
    +                   triggerQueuedRequests();
    +           }
    +
    +           LOG.info("Completed checkpoint {} (in {} ms).", checkpointId, 
completedCheckpoint.getDuration());
    +
    +           if (LOG.isDebugEnabled()) {
    --- End diff --
    
    While rebasing you have to make sure to copy the updated string builder here


> Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-5158
>                 URL: https://issues.apache.org/jira/browse/FLINK-5158
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator does not properly handle exceptions when trying to 
> store completed checkpoints. As a result, completed checkpoints are not 
> properly cleaned up and even worse, the {{CheckpointCoordinator}} might get 
> stuck stopping triggering checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to