Repository: flink Updated Branches: refs/heads/master 34110fefc -> c9e0761de
[hotfix] [runtime] Guard async recovery operation in try-catch If something fails during a RecoverJobGraph, the logs don't show anything. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c41ee94b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c41ee94b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c41ee94b Branch: refs/heads/master Commit: c41ee94b8a4ab19cafe5ffdacd45562288b0142d Parents: 34110fe Author: Ufuk Celebi <[email protected]> Authored: Fri Mar 11 15:05:53 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Fri Mar 11 15:30:25 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 40 ++++++++++---------- 1 file changed, 21 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c41ee94b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index e658cc8..e94a40c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -393,26 +393,29 @@ class JobManager( case RecoverJob(jobId) => future { - // The ActorRef, which is part of the submitted job graph can only be de-serialized in the - // scope of an actor system. - akka.serialization.JavaSerializer.currentSystem.withValue( - context.system.asInstanceOf[ExtendedActorSystem]) { + try { + // The ActorRef, which is part of the submitted job graph can only be + // de-serialized in the scope of an actor system. + akka.serialization.JavaSerializer.currentSystem.withValue( + context.system.asInstanceOf[ExtendedActorSystem]) { - log.info(s"Attempting to recover job $jobId.") + log.info(s"Attempting to recover job $jobId.") + val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId) - val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId) + submittedJobGraphOption match { + case Some(submittedJobGraph) => + if (!leaderElectionService.hasLeadership()) { + // we've lost leadership. mission: abort. + log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") + } else { + self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) + } - submittedJobGraphOption match { - case Some(submittedJobGraph) => - if (!leaderElectionService.hasLeadership()) { - // we've lost leadership. mission: abort. - log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") - } - else { - self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) - } - case None => log.warn(s"Failed to recover job graph $jobId.") + case None => log.info(s"Attempted to recover job $jobId, but no job graph found.") + } } + } catch { + case t: Throwable => log.error(s"Failed to recover job $jobId.", t) } }(context.dispatcher) @@ -432,8 +435,7 @@ class JobManager( // we've lost leadership. mission: abort. log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " + s"jobs.") - } - else { + } else { log.info(s"Re-submitting ${jobGraphs.size} job graphs.") jobGraphs.foreach{ @@ -443,7 +445,7 @@ class JobManager( } } } catch { - case e: Exception => log.error("Fatal error: Failed to recover jobs.", e) + case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t) } }(context.dispatcher)
