[ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dominik Wosiński updated FLINK-9575:
------------------------------------
    Description: 
When we are removing the _JobGraph_ from _JobManager_ for example after 
invoking _cancel()_, the following code is executed : 
{noformat}
 
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
result = if (removeJobFromStateBackend) { val futureOption = Some(future { try 
{ // ...otherwise, we can have lingering resources when there is a concurrent 
shutdown // and the ZooKeeper client is closed. Not removing the job 
immediately allow the // shutdown to release all resources. 
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
log.warn(s"Could not remove submitted job graph $jobID.", t) } 
}(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( 
jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => 
log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } 
else { None } currentJobs.remove(jobID) result case None => None } // remove 
all job-related BLOBs from local and HA store 
libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent 
shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow 
the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", 
t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(
ArchiveExecutionGraph(
jobID,
ArchivedExecutionGraph.createFrom(eg)))
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
} else {
None
}

currentJobs.remove(jobID)

result
case None => None
}

// remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend)

jobManagerMetricGroup.removeJob(jobID)

futureOption
}{noformat}

  was:
When we are removing the _JobGraph_ from _JobManager_ for example after 
invoking _cancel()_, the following code is executed : 
{noformat}
 
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
result = if (removeJobFromStateBackend) { val futureOption = Some(future { try 
{ // ...otherwise, we can have lingering resources when there is a concurrent 
shutdown // and the ZooKeeper client is closed. Not removing the job 
immediately allow the // shutdown to release all resources. 
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
log.warn(s"Could not remove submitted job graph $jobID.", t) } 
}(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( 
jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => 
log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } 
else { None } currentJobs.remove(jobID) result case None => None } // remove 
all job-related BLOBs from local and HA store 
libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
{noformat}


> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
>                 Key: FLINK-9575
>                 URL: https://issues.apache.org/jira/browse/FLINK-9575
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Dominik Wosiński
>            Priority: Major
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
> result = if (removeJobFromStateBackend) { val futureOption = Some(future { 
> try { // ...otherwise, we can have lingering resources when there is a 
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the 
> job immediately allow the // shutdown to release all resources. 
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
> log.warn(s"Could not remove submitted job graph $jobID.", t) } 
> }(context.dispatcher)) try { archive ! decorateMessage( 
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch 
> { case t: Throwable => log.warn(s"Could not archive the execution graph 
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result 
> case None => None } // remove all job-related BLOBs from local and HA store 
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) 
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent 
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow 
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph 
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", 
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to