Repository: flink Updated Branches: refs/heads/release-1.0 5f35d13be -> a405b55b0
[hotfix] [runtime] Guard async recovery operation in try-catch If something fails during a RecoverJobGraph, the logs don't show anything. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a405b55b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a405b55b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a405b55b Branch: refs/heads/release-1.0 Commit: a405b55b0edc13487e2bd99ede59651a1782d0da Parents: 5f35d13 Author: Ufuk Celebi <[email protected]> Authored: Fri Mar 11 15:05:53 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Fri Mar 11 15:31:07 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmanager/JobManager.scala | 40 ++++++++++---------- 1 file changed, 21 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a405b55b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 1c6fce8..5858171 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -393,26 +393,29 @@ class JobManager( case RecoverJob(jobId) => future { - // The ActorRef, which is part of the submitted job graph can only be de-serialized in the - // scope of an actor system. - akka.serialization.JavaSerializer.currentSystem.withValue( - context.system.asInstanceOf[ExtendedActorSystem]) { + try { + // The ActorRef, which is part of the submitted job graph can only be + // de-serialized in the scope of an actor system. + akka.serialization.JavaSerializer.currentSystem.withValue( + context.system.asInstanceOf[ExtendedActorSystem]) { - log.info(s"Attempting to recover job $jobId.") + log.info(s"Attempting to recover job $jobId.") + val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId) - val submittedJobGraphOption = submittedJobGraphs.recoverJobGraph(jobId) + submittedJobGraphOption match { + case Some(submittedJobGraph) => + if (!leaderElectionService.hasLeadership()) { + // we've lost leadership. mission: abort. + log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") + } else { + self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) + } - submittedJobGraphOption match { - case Some(submittedJobGraph) => - if (!leaderElectionService.hasLeadership()) { - // we've lost leadership. mission: abort. - log.warn(s"Lost leadership during recovery. Aborting recovery of $jobId.") - } - else { - self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) - } - case None => log.warn(s"Failed to recover job graph $jobId.") + case None => log.info(s"Attempted to recover job $jobId, but no job graph found.") + } } + } catch { + case t: Throwable => log.error(s"Failed to recover job $jobId.", t) } }(context.dispatcher) @@ -432,8 +435,7 @@ class JobManager( // we've lost leadership. mission: abort. log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " + s"jobs.") - } - else { + } else { log.info(s"Re-submitting ${jobGraphs.size} job graphs.") jobGraphs.foreach{ @@ -443,7 +445,7 @@ class JobManager( } } } catch { - case e: Exception => log.error("Fatal error: Failed to recover jobs.", e) + case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t) } }(context.dispatcher)
