[ https://issues.apache.org/jira/browse/FLINK-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14319813#comment-14319813 ]
ASF GitHub Bot commented on FLINK-1529: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24653930 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging { log.error(t, "Could not properly unregister job {} form the library cache.", jobID) } } - - private def checkJavaVersion(): Unit = { - if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) { - log.warning("Warning: Flink is running with Java 6. " + - "Java 6 is not maintained any more by Oracle or the OpenJDK community. " + - "Flink currently supports Java 6, but may not in future releases," + - " due to the unavailability of bug fixes security patched.") - } - } } object JobManager { + import ExecutionMode._ + val LOG = LoggerFactory.getLogger(classOf[JobManager]) + val FAILURE_RETURN_CODE = 1 + val JOB_MANAGER_NAME = "jobmanager" val EVENT_COLLECTOR_NAME = "eventcollector" val ARCHIVE_NAME = "archive" val PROFILER_NAME = "profiler" def main(args: Array[String]): Unit = { + + // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager") - val (configuration, executionMode, listeningAddress) = parseArgs(args) + checkJavaVersion() - if(SecurityUtils.isSecurityEnabled) { + val (configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) = + try { + parseArgs(args) + } + catch { + case t: Throwable => { + LOG.error(t.getMessage(), t) + System.exit(FAILURE_RETURN_CODE) + null + } + } + + try { + if (SecurityUtils.isSecurityEnabled) { LOG.info("Security is enabled. Starting secure JobManager.") SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { override def run(): Unit = { - start(configuration, executionMode, listeningAddress) + runJobManager(configuration, executionMode, listeningAddress) } }) } else { - start(configuration, executionMode, listeningAddress) + runJobManager(configuration, executionMode, listeningAddress) + } + } + catch { + case t: Throwable => { + LOG.error("Failed to start JobManager.", t) + System.exit(FAILURE_RETURN_CODE) } + } } - def start(configuration: Configuration, executionMode: ExecutionMode, - listeningAddress : Option[(String, Int)]): Unit = { - val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress) - startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem) + def runJobManager(configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) : Unit = { + + LOG.info("Starting JobManager") + LOG.debug("Starting JobManager actor system") - if(executionMode.equals(LOCAL)){ - TaskManager.startActorWithConfiguration("", configuration, - localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) + val jobManagerSystem = try { + AkkaUtils.createActorSystem(configuration, listeningAddress) } + catch { + case t: Throwable => { + if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) { + val cause = t.getCause() + if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) { + val address = listeningAddress match { + case Some((host, port)) => host + ":" + port + case None => "unknown" + } - jobManagerSystem.awaitTermination() + throw new Exception("Unable to create JobManager at address " + address + ": " + cause.getMessage(), t) + } + } + throw new Exception("Could not create JobManager actor system", t) + } + } + + try { + LOG.debug("Starting JobManager actor") + + startActor(configuration, jobManagerSystem) + + if(executionMode.equals(LOCAL)){ + LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution") + + TaskManager.startActorWithConfiguration("", configuration, + localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) + } + + jobManagerSystem.awaitTermination() + } + catch { + case t: Throwable => { + Try(jobManagerSystem.shutdown()) --- End diff -- I'll log them > Improve JobManager startup robustness > ------------------------------------- > > Key: FLINK-1529 > URL: https://issues.apache.org/jira/browse/FLINK-1529 > Project: Flink > Issue Type: Improvement > Components: JobManager > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 0.9 > > > Currently, the JobManager is creates asynchronously (as an actor). If its > initialization fails (for various reasons), the process does not terminate > and gives only vague log message that an actor creation failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)