[ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dominik Wosiński updated FLINK-9575:
------------------------------------
    Description: 
When we are removing the _JobGraph_ from _JobManager_ for example after 
invoking _cancel()_, the following code is executed : 
{noformat}
 
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
result = if (removeJobFromStateBackend) { val futureOption = Some(future { try 
{ // ...otherwise, we can have lingering resources when there is a concurrent 
shutdown // and the ZooKeeper client is closed. Not removing the job 
immediately allow the // shutdown to release all resources. 
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
log.warn(s"Could not remove submitted job graph $jobID.", t) } 
}(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( 
jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => 
log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } 
else { None } currentJobs.remove(jobID) result case None => None } // remove 
all job-related BLOBs from local and HA store 
libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent 
shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow 
the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", 
t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(
ArchiveExecutionGraph(
jobID,
ArchivedExecutionGraph.createFrom(eg)))
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
} else {
None
}

currentJobs.remove(jobID)

result
case None => None
}

// remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend)

jobManagerMetricGroup.removeJob(jobID)

futureOption
}{noformat}
This causes the asynchronous removal of the job and synchronous removal of blob 
files connected with this jar. This means as far as I understand that there is 
a potential problem that we can fail to remove job graph from 
_submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
can try to recover such job, but it will fail with an exception since the 
assigned blob was already removed.

  was:
When we are removing the _JobGraph_ from _JobManager_ for example after 
invoking _cancel()_, the following code is executed : 
{noformat}
 
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
result = if (removeJobFromStateBackend) { val futureOption = Some(future { try 
{ // ...otherwise, we can have lingering resources when there is a concurrent 
shutdown // and the ZooKeeper client is closed. Not removing the job 
immediately allow the // shutdown to release all resources. 
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
log.warn(s"Could not remove submitted job graph $jobID.", t) } 
}(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( 
jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => 
log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } 
else { None } currentJobs.remove(jobID) result case None => None } // remove 
all job-related BLOBs from local and HA store 
libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent 
shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow 
the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", 
t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(
ArchiveExecutionGraph(
jobID,
ArchivedExecutionGraph.createFrom(eg)))
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
} else {
None
}

currentJobs.remove(jobID)

result
case None => None
}

// remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend)

jobManagerMetricGroup.removeJob(jobID)

futureOption
}{noformat}
This causes the asynchronous removal of the job and synchronous removal of blob 
files connected with this jar. This means as far as I understand that there is 
potential problem


> Potential race condition when removing JobGraph in HA
> -----------------------------------------------------
>
>                 Key: FLINK-9575
>                 URL: https://issues.apache.org/jira/browse/FLINK-9575
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Dominik Wosiński
>            Priority: Major
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the following code is executed : 
> {noformat}
>  
> val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
> result = if (removeJobFromStateBackend) { val futureOption = Some(future { 
> try { // ...otherwise, we can have lingering resources when there is a 
> concurrent shutdown // and the ZooKeeper client is closed. Not removing the 
> job immediately allow the // shutdown to release all resources. 
> submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
> log.warn(s"Could not remove submitted job graph $jobID.", t) } 
> }(context.dispatcher)) try { archive ! decorateMessage( 
> ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch 
> { case t: Throwable => log.warn(s"Could not archive the execution graph 
> $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result 
> case None => None } // remove all job-related BLOBs from local and HA store 
> libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
> removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) 
> futureOption }
> val futureOption = currentJobs.get(jobID) match {
> case Some((eg, _)) =>
> val result = if (removeJobFromStateBackend) {
> val futureOption = Some(future {
> try {
> // ...otherwise, we can have lingering resources when there is a concurrent 
> shutdown
> // and the ZooKeeper client is closed. Not removing the job immediately allow 
> the
> // shutdown to release all resources.
> submittedJobGraphs.removeJobGraph(jobID)
> } catch {
> case t: Throwable => log.warn(s"Could not remove submitted job graph 
> $jobID.", t)
> }
> }(context.dispatcher))
> try {
> archive ! decorateMessage(
> ArchiveExecutionGraph(
> jobID,
> ArchivedExecutionGraph.createFrom(eg)))
> } catch {
> case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", 
> t)
> }
> futureOption
> } else {
> None
> }
> currentJobs.remove(jobID)
> result
> case None => None
> }
> // remove all job-related BLOBs from local and HA store
> libraryCacheManager.unregisterJob(jobID)
> blobServer.cleanupJob(jobID, removeJobFromStateBackend)
> jobManagerMetricGroup.removeJob(jobID)
> futureOption
> }{noformat}
> This causes the asynchronous removal of the job and synchronous removal of 
> blob files connected with this jar. This means as far as I understand that 
> there is a potential problem that we can fail to remove job graph from 
> _submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
> can try to recover such job, but it will fail with an exception since the 
> assigned blob was already removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to