[
https://issues.apache.org/jira/browse/FLINK-13861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962091#comment-16962091
]
Biao Liu commented on FLINK-13861:
----------------------------------
Thanks for reporting [~klion26]! It's definitely a critical issue.
I agree with [~pnowojski] that we should not handle this kind of exception.
There might be a resource leaking or other unexpected scenario (like skipping
the {{triggerQueuedRequests}}) if we tolerate the exception. A big try catch
wrapping this canceller might be a good choice. Any statement of the
cancellation should not cause an exception here, not only the
{{failPendingCheckpoint}}.
I'm also interested in the root cause of this issue. I would check the relevant
codes later to search for clues.
Regarding to the potential conflict, I could take care of that :) So please do
the fixing if you would like to, I could help reviewing if you need.
> No new checkpoint will be trigged when canceling an expired checkpoint failed
> -----------------------------------------------------------------------------
>
> Key: FLINK-13861
> URL: https://issues.apache.org/jira/browse/FLINK-13861
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.7.2, 1.8.1, 1.9.0
> Reporter: Congxian Qiu(klion26)
> Priority: Major
> Fix For: 1.10.0
>
>
> I encountered this problem in our private fork of Flink, after taking a look
> at the current master branch of Apache Flink, I think the problem exists here
> also.
> Problem Detail:
> 1. checkpoint canceled because of expiration, so will call the canceller
> such as below
> {code:java}
> final Runnable canceller = () -> {
> synchronized (lock) {
> // only do the work if the checkpoint is not discarded anyways
> // note that checkpoint completion discards the pending checkpoint
> object
> if (!checkpoint.isDiscarded()) {
> LOG.info("Checkpoint {} of job {} expired before completing.",
> checkpointID, job);
> failPendingCheckpoint(checkpoint,
> CheckpointFailureReason.CHECKPOINT_EXPIRED);
> pendingCheckpoints.remove(checkpointID);
> rememberRecentCheckpointId(checkpointID);
> triggerQueuedRequests();
> }
> }
> };{code}
>
> But failPendingCheckpoint may throw exceptions because it will call
> {{CheckpointCoordinator#failPendingCheckpoint}}
> -> {{PendingCheckpoint#abort}}
> -> {{PendingCheckpoint#reportFailedCheckpoint}}
> -> initialize a FailedCheckpointStates, may throw an exception by
> {{checkArgument}}
> Did not find more about why there ever failed the {{checkArgument
> currently(this problem did not reproduce frequently)}}, will create an issue
> for that if I have more findings.
>
> 2. when trigger checkpoint next, we'll first check if there already are too
> many checkpoints such as below
> {code:java}
> private void checkConcurrentCheckpoints() throws CheckpointException {
> if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
> triggerRequestQueued = true;
> if (currentPeriodicTrigger != null) {
> currentPeriodicTrigger.cancel(false);
> currentPeriodicTrigger = null;
> }
> throw new
> CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
> }
> }
> {code}
> the {{pendingCheckpoints.zie() >= maxConcurrentCheckpoitnAttempts}} will
> always true
> 3. no checkpoint will be triggered ever from that on.
> Because of the {{failPendingCheckpoint}} may throw Exception, so we may
> place the remove pending checkpoint logic in a finally block.
> I'd like to file a pr for this if this really needs to fix.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)