Repository: flink
Updated Branches:
  refs/heads/master 34110fefc -> c9e0761de


[hotfix] [runtime] Guard async recovery operation in try-catch

If something fails during a RecoverJobGraph, the logs don't
show anything.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c41ee94b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c41ee94b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c41ee94b

Branch: refs/heads/master
Commit: c41ee94b8a4ab19cafe5ffdacd45562288b0142d
Parents: 34110fe
Author: Ufuk Celebi <[email protected]>
Authored: Fri Mar 11 15:05:53 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Mar 11 15:30:25 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 40 ++++++++++----------
 1 file changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c41ee94b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e658cc8..e94a40c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -393,26 +393,29 @@ class JobManager(
 
     case RecoverJob(jobId) =>
       future {
-        // The ActorRef, which is part of the submitted job graph can only be 
de-serialized in the
-        // scope of an actor system.
-        akka.serialization.JavaSerializer.currentSystem.withValue(
-          context.system.asInstanceOf[ExtendedActorSystem]) {
+        try {
+          // The ActorRef, which is part of the submitted job graph can only be
+          // de-serialized in the scope of an actor system.
+          akka.serialization.JavaSerializer.currentSystem.withValue(
+            context.system.asInstanceOf[ExtendedActorSystem]) {
 
-          log.info(s"Attempting to recover job $jobId.")
+            log.info(s"Attempting to recover job $jobId.")
+            val submittedJobGraphOption = 
submittedJobGraphs.recoverJobGraph(jobId)
 
-          val submittedJobGraphOption = 
submittedJobGraphs.recoverJobGraph(jobId)
+            submittedJobGraphOption match {
+              case Some(submittedJobGraph) =>
+                if (!leaderElectionService.hasLeadership()) {
+                  // we've lost leadership. mission: abort.
+                  log.warn(s"Lost leadership during recovery. Aborting 
recovery of $jobId.")
+                } else {
+                  self ! 
decorateMessage(RecoverSubmittedJob(submittedJobGraph))
+                }
 
-          submittedJobGraphOption match {
-            case Some(submittedJobGraph) =>
-              if (!leaderElectionService.hasLeadership()) {
-                // we've lost leadership. mission: abort.
-                log.warn(s"Lost leadership during recovery. Aborting recovery 
of $jobId.")
-              }
-              else {
-                self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph))
-              }
-            case None => log.warn(s"Failed to recover job graph $jobId.")
+              case None => log.info(s"Attempted to recover job $jobId, but no 
job graph found.")
+            }
           }
+        } catch {
+          case t: Throwable => log.error(s"Failed to recover job $jobId.", t)
         }
       }(context.dispatcher)
 
@@ -432,8 +435,7 @@ class JobManager(
               // we've lost leadership. mission: abort.
               log.warn(s"Lost leadership during recovery. Aborting recovery of 
${jobGraphs.size} " +
                 s"jobs.")
-            }
-            else {
+            } else {
               log.info(s"Re-submitting ${jobGraphs.size} job graphs.")
 
               jobGraphs.foreach{
@@ -443,7 +445,7 @@ class JobManager(
             }
           }
         } catch {
-          case e: Exception => log.error("Fatal error: Failed to recover 
jobs.", e)
+          case t: Throwable => log.error("Fatal error: Failed to recover 
jobs.", t)
         }
       }(context.dispatcher)
 

Reply via email to