Github user yanghua commented on a diff in the pull request: https://github.com/apache/flink/pull/6322#discussion_r202013852 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -1759,11 +1759,22 @@ class JobManager( case None => None } - // remove all job-related BLOBs from local and HA store - libraryCacheManager.unregisterJob(jobID) - blobServer.cleanupJob(jobID, removeJobFromStateBackend) + // remove all job-related BLOBs from local and HA store, only if the job was removed correctly + futureOption match { + case Some(future) => future.onComplete{ + case scala.util.Success(_) => { + libraryCacheManager.unregisterJob(jobID) + blobServer.cleanupJob(jobID, removeJobFromStateBackend) + jobManagerMetricGroup.removeJob(jobID) + } + + case scala.util.Failure(_) => + + }(context.dispatcher) + + case None => None + } - jobManagerMetricGroup.removeJob(jobID) --- End diff -- this line can also be removed
---