[
https://issues.apache.org/jira/browse/FLINK-13497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900732#comment-16900732
]
Piotr Nowojski commented on FLINK-13497:
----------------------------------------
As far as we (me, [~carp84] and [~StephanEwen]) understand this, root problem
here is a race condition on the JobManager between failing a checkpoint and
completing another checkpoint, caused by {{FailJobCallback
CheckpointFailureManager::failureCallback}} executing asynchronous operation:
{code:java}
cause -> getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause)
{code}
Possible race condition between:
Decline checkpoint
# JM receives {{JobMaster#declineCheckpoint}}
# In {{LegacyScheduler#declineCheckpoint}} we asynchronously schedule
{{checkpointCoordinator.receiveDeclineMessage}} on {{ioExecutor}}
# This eventually reaches
{{CheckpointFailureManager#handleCheckpointException}}, which decides again to
asynchronously fail job ({{ExecutionGraph#failGlobal}} on
{{getJobMasterMainThreadExecutor}}.
# {{failGlobal}} cancels all pending checkpoints
Acknowledge checkpoint
# JM receives {{JobMaster#acknowledgeCheckpoint}}
# In {{LegacyScheduler#acknowledgeCheckpoint}} we asynchronously schedule
{{checkpointCoordinator.receiveAcknowledgeMessage}} on {{ioExecutor}}
# This completes some pending checkpoint
If "Acknowledge checkpoint" path executes and completes while {{failGlobal}}
from "Decline checkpoint" step 4 is awaiting execution, this lead to a
completed checkpoint AFTER we failed a job.
There might be more that kind of race conditions introduced in by this
{{failureCallback}}.
I'm not sure if there is some easy fix for that, because whole design of
{{CheckpointCoordinator}} seems strange: like why {{checkpointCoordinator}}
methods are executed on the {{ioExecutor}}? I think the proper solution would
need to clean up the threading model here.
{{CheckpointCoordinator}} should mostly single threaded, executed only on the
JobMasters's main thread executor (not on {{ioExecutor}}). {{ioExecutor}}
should be only used for IO operations, like deleting/moving/touching files.
Same applies to {{CheckpointFailureManager}}. That should remove/limit the
concurrency issues.
Some minor point for refactoring, is that we probably could cut the cyclic
dependency between {{CheckpointFailureManager}} and {{CheckpointCoordinator}},
by removing {{CheckpointFailureManager::failureCallback}} and changing for
example
{code:java}
void CheckpointFailureManager::handleCheckpointException(ex)
{code}
to
{code:java}
// return true if an exception should fail job
boolean CheckpointFailureManager::shouldCheckpointExceptionFailJob(ex)
{code}
However as far as we know, this is not a blocker issue. However it might be
surprising that a checkpoint has completed after a job has failed. For now, I'm
removing the fix for this from 1.9 release. I hope that this issues is not a
tip of an iceberg and that we are not missing some other bugs/problems here.
> Checkpoints can complete after CheckpointFailureManager fails job
> -----------------------------------------------------------------
>
> Key: FLINK-13497
> URL: https://issues.apache.org/jira/browse/FLINK-13497
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.9.0, 1.10.0
> Reporter: Till Rohrmann
> Priority: Critical
> Fix For: 1.9.0
>
>
> I think that we introduced with FLINK-12364 an inconsistency wrt to job
> termination a checkpointing. In FLINK-9900 it was discovered that checkpoints
> can complete even after the {{CheckpointFailureManager}} decided to fail a
> job. I think the expected behaviour should be that we fail all pending
> checkpoints once the {{CheckpointFailureManager}} decides to fail the job.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)