[
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515675#comment-16515675
]
Dominik Wosiński commented on FLINK-9575:
-----------------------------------------
Is my understanding right ?
> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
> Key: FLINK-9575
> URL: https://issues.apache.org/jira/browse/FLINK-9575
> Project: Flink
> Issue Type: Bug
> Reporter: Dominik Wosiński
> Priority: Major
>
> When we are removing the _JobGraph_ from _JobManager_ for example after
> invoking _cancel()_, the following code is executed :
> {noformat}
>
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val
> result = if (removeJobFromStateBackend) { val futureOption = Some(future {
> try { // ...otherwise, we can have lingering resources when there is a
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the
> job immediately allow the // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable =>
> log.warn(s"Could not remove submitted job graph $jobID.", t) }
> }(context.dispatcher)) try { archive ! decorateMessage(
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch
> { case t: Throwable => log.warn(s"Could not archive the execution graph
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result
> case None => None } // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID,
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID)
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.",
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of
> blob files connected with this jar. This means as far as I understand that
> there is a potential problem that we can fail to remove job graph from
> _submittedJobGraphs._ If the JobManager fails and we elect the new leader it
> can try to recover such job, but it will fail with an exception since the
> assigned blob was already removed.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)