Repository: flink Updated Branches: refs/heads/release-1.1 0758d0be6 -> 569a9666f
[FLINK-5197] [jm] Ignore outdated JobStatusChanged messages Outdated JobStatusChanged messages no longer trigger a RemoveJob message but are logged and ignored. This has the advantage, that an outdated JobStatusChanged message cannot interfere with a recovered job which can have the same job id. This closes #2896. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/569a9666 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/569a9666 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/569a9666 Branch: refs/heads/release-1.1 Commit: 569a9666fca9d9113d9fc7f0382faf986afb036f Parents: 0758d0b Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Nov 29 16:02:29 2016 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Nov 30 13:41:05 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/569a9666/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4a4968f..cf60d4e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -746,7 +746,7 @@ class JobManager( } }(context.dispatcher) - case JobStatusChanged(jobID, newJobStatus, timeStamp, error) => + case msg @ JobStatusChanged(jobID, newJobStatus, timeStamp, error) => currentJobs.get(jobID) match { case Some((executionGraph, jobInfo)) => executionGraph.getJobName @@ -818,8 +818,7 @@ class JobManager( } }(context.dispatcher) } - case None => - self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true)) + case None => log.debug(s"Received $msg for nonexistent job $jobID.") } case ScheduleOrUpdateConsumers(jobId, partitionId) => @@ -956,7 +955,7 @@ class JobManager( futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) :+ futureToComplete) case None => } - case None => + case None => log.debug(s"Tried to remove nonexistent job $jobID.") } case RemoveCachedJob(jobID) => @@ -1620,7 +1619,7 @@ class JobManager( // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { - case t: Throwable => log.error(s"Could not remove submitted job graph $jobID.", t) + case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) @@ -1629,7 +1628,7 @@ class JobManager( archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg)) } catch { - case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " + + case t: Throwable => log.warn(s"Could not prepare the execution graph $eg for " + "archiving.", t) }