Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24653930 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging { log.error(t, "Could not properly unregister job {} form the library cache.", jobID) } } - - private def checkJavaVersion(): Unit = { - if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) { - log.warning("Warning: Flink is running with Java 6. " + - "Java 6 is not maintained any more by Oracle or the OpenJDK community. " + - "Flink currently supports Java 6, but may not in future releases," + - " due to the unavailability of bug fixes security patched.") - } - } } object JobManager { + import ExecutionMode._ + val LOG = LoggerFactory.getLogger(classOf[JobManager]) + val FAILURE_RETURN_CODE = 1 + val JOB_MANAGER_NAME = "jobmanager" val EVENT_COLLECTOR_NAME = "eventcollector" val ARCHIVE_NAME = "archive" val PROFILER_NAME = "profiler" def main(args: Array[String]): Unit = { + + // startup checks and logging EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager") - val (configuration, executionMode, listeningAddress) = parseArgs(args) + checkJavaVersion() - if(SecurityUtils.isSecurityEnabled) { + val (configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) = + try { + parseArgs(args) + } + catch { + case t: Throwable => { + LOG.error(t.getMessage(), t) + System.exit(FAILURE_RETURN_CODE) + null + } + } + + try { + if (SecurityUtils.isSecurityEnabled) { LOG.info("Security is enabled. Starting secure JobManager.") SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { override def run(): Unit = { - start(configuration, executionMode, listeningAddress) + runJobManager(configuration, executionMode, listeningAddress) } }) } else { - start(configuration, executionMode, listeningAddress) + runJobManager(configuration, executionMode, listeningAddress) + } + } + catch { + case t: Throwable => { + LOG.error("Failed to start JobManager.", t) + System.exit(FAILURE_RETURN_CODE) } + } } - def start(configuration: Configuration, executionMode: ExecutionMode, - listeningAddress : Option[(String, Int)]): Unit = { - val jobManagerSystem = AkkaUtils.createActorSystem(configuration, listeningAddress) - startActor(Props(new JobManager(configuration) with WithWebServer))(jobManagerSystem) + def runJobManager(configuration: Configuration, + executionMode: ExecutionMode, + listeningAddress: Option[(String, Int)]) : Unit = { + + LOG.info("Starting JobManager") + LOG.debug("Starting JobManager actor system") - if(executionMode.equals(LOCAL)){ - TaskManager.startActorWithConfiguration("", configuration, - localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) + val jobManagerSystem = try { + AkkaUtils.createActorSystem(configuration, listeningAddress) } + catch { + case t: Throwable => { + if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) { + val cause = t.getCause() + if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) { + val address = listeningAddress match { + case Some((host, port)) => host + ":" + port + case None => "unknown" + } - jobManagerSystem.awaitTermination() + throw new Exception("Unable to create JobManager at address " + address + ": " + cause.getMessage(), t) + } + } + throw new Exception("Could not create JobManager actor system", t) + } + } + + try { + LOG.debug("Starting JobManager actor") + + startActor(configuration, jobManagerSystem) + + if(executionMode.equals(LOCAL)){ + LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution") + + TaskManager.startActorWithConfiguration("", configuration, + localAkkaCommunication = false, localTaskManagerCommunication = true)(jobManagerSystem) + } + + jobManagerSystem.awaitTermination() + } + catch { + case t: Throwable => { + Try(jobManagerSystem.shutdown()) --- End diff -- I'll log them
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---