[ 
https://issues.apache.org/jira/browse/FLINK-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14318641#comment-14318641
 ] 

ASF GitHub Bot commented on FLINK-1529:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24602187
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the 
library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) 
{
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK 
community. " +
    -        "Flink currently supports Java 6, but may not in future releases," 
+
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    +      }
    +    }
    +
    +    try {
    +      if (SecurityUtils.isSecurityEnabled) {
             LOG.info("Security is enabled. Starting secure JobManager.")
             SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
               override def run(): Unit = {
    -            start(configuration, executionMode, listeningAddress)
    +            runJobManager(configuration, executionMode, listeningAddress)
               }
             })
           } else {
    -        start(configuration, executionMode, listeningAddress)
    +        runJobManager(configuration, executionMode, listeningAddress)
    +      }
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error("Failed to start JobManager.", t)
    +        System.exit(FAILURE_RETURN_CODE)
           }
    +    }
       }
     
    -  def start(configuration: Configuration, executionMode: ExecutionMode,
    -            listeningAddress : Option[(String, Int)]): Unit = {
    -    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, 
listeningAddress)
     
    -    startActor(Props(new JobManager(configuration) with 
WithWebServer))(jobManagerSystem)
    +  def runJobManager(configuration: Configuration,
    +                    executionMode: ExecutionMode,
    +                    listeningAddress: Option[(String, Int)]) : Unit = {
    +
    +    LOG.info("Starting JobManager")
    +    LOG.debug("Starting JobManager actor system")
     
    -    if(executionMode.equals(LOCAL)){
    -      TaskManager.startActorWithConfiguration("", configuration,
    -        localAkkaCommunication = false, localTaskManagerCommunication = 
true)(jobManagerSystem)
    +    val jobManagerSystem = try {
    +      AkkaUtils.createActorSystem(configuration, listeningAddress)
         }
    +    catch {
    +      case t: Throwable => {
    +        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
    +          val cause = t.getCause()
    +          if (cause != null && 
t.getCause().isInstanceOf[java.net.BindException]) {
    +            val address = listeningAddress match {
    +              case Some((host, port)) => host + ":" + port
    +              case None => "unknown"
    +            }
     
    -    jobManagerSystem.awaitTermination()
    +            throw new Exception("Unable to create JobManager at address " 
+ address + ": " + cause.getMessage(), t)
    +          }
    +        }
    +        throw new Exception("Could not create JobManager actor system", t)
    +      }
    +    }
    +
    +    try {
    +      LOG.debug("Starting JobManager actor")
    +
    +      startActor(configuration, jobManagerSystem)
    +
    +      if(executionMode.equals(LOCAL)){
    +        LOG.info("Starting embedded TaskManager for JobManager's LOCAL 
mode execution")
    +
    +        TaskManager.startActorWithConfiguration("", configuration,
    +          localAkkaCommunication = false, localTaskManagerCommunication = 
true)(jobManagerSystem)
    +      }
    +
    +      jobManagerSystem.awaitTermination()
    +    }
    +    catch {
    +      case t: Throwable => {
    +        Try(jobManagerSystem.shutdown())
    --- End diff --
    
    Do you want to swallow all potential exceptions of 
```ActorSystem.shutdown``` intentionally?


> Improve JobManager startup robustness
> -------------------------------------
>
>                 Key: FLINK-1529
>                 URL: https://issues.apache.org/jira/browse/FLINK-1529
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.9
>
>
> Currently, the JobManager is creates asynchronously (as an actor). If its 
> initialization fails (for various reasons), the process does not terminate 
> and gives only vague log message that an actor creation failed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to