[ 
https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962091#comment-16962091
 ] 

Biao Liu commented on FLINK-13861:
----------------------------------

Thanks for reporting [~klion26]! It's definitely a critical issue.

I agree with [~pnowojski] that we should not handle this kind of exception. 
There might be a resource leaking or other unexpected scenario (like skipping 
the {{triggerQueuedRequests}}) if we tolerate the exception. A big try catch 
wrapping this canceller might be a good choice. Any statement of the 
cancellation should not cause an exception here, not only the 
{{failPendingCheckpoint}}. 

I'm also interested in the root cause of this issue. I would check the relevant 
codes later to search for clues.

Regarding to the potential conflict, I could take care of that :) So please do 
the fixing if you would like to, I could help reviewing if you need.

> No new checkpoint will be trigged when canceling an expired checkpoint failed
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-13861
>                 URL: https://issues.apache.org/jira/browse/FLINK-13861
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.7.2, 1.8.1, 1.9.0
>            Reporter: Congxian Qiu(klion26)
>            Priority: Major
>             Fix For: 1.10.0
>
>
> I encountered this problem in our private fork of Flink, after taking a look 
> at the current master branch of Apache Flink, I think the problem exists here 
> also.
> Problem Detail:
>  1. checkpoint canceled because of expiration, so will call the canceller 
> such as below
> {code:java}
> final Runnable canceller = () -> {
>    synchronized (lock) {
>       // only do the work if the checkpoint is not discarded anyways
>       // note that checkpoint completion discards the pending checkpoint 
> object
>       if (!checkpoint.isDiscarded()) {
>          LOG.info("Checkpoint {} of job {} expired before completing.", 
> checkpointID, job);
>          failPendingCheckpoint(checkpoint, 
> CheckpointFailureReason.CHECKPOINT_EXPIRED);
>          pendingCheckpoints.remove(checkpointID);
>          rememberRecentCheckpointId(checkpointID);
>          triggerQueuedRequests();
>       }
>    }
> };{code}
>  
>  But failPendingCheckpoint may throw exceptions because it will call
> {{CheckpointCoordinator#failPendingCheckpoint}}
> -> {{PendingCheckpoint#abort}}
> ->  {{PendingCheckpoint#reportFailedCheckpoint}}
> -> initialize a FailedCheckpointStates,  may throw an exception by 
> {{checkArgument}} 
> Did not find more about why there ever failed the {{checkArgument 
> currently(this problem did not reproduce frequently)}}, will create an issue 
> for that if I have more findings.
>  
> 2. when trigger checkpoint next, we'll first check if there already are too 
> many checkpoints such as below
> {code:java}
> private void checkConcurrentCheckpoints() throws CheckpointException {
>    if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
>       triggerRequestQueued = true;
>       if (currentPeriodicTrigger != null) {
>          currentPeriodicTrigger.cancel(false);
>          currentPeriodicTrigger = null;
>       }
>       throw new 
> CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
>    }
> }
> {code}
> the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will 
> always true
> 3. no checkpoint will be triggered ever from that on.
>  Because of the {{failPendingCheckpoint}} may throw Exception, so we may 
> place the remove pending checkpoint logic in a finally block.
> I'd like to file a pr for this if this really needs to fix.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to