[ 
https://issues.apache.org/jira/browse/FLINK-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14319844#comment-14319844
 ] 

ASF GitHub Bot commented on FLINK-1529:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24655790
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -530,54 +515,109 @@ Actor with ActorLogMessages with ActorLogging {
             log.error(t, "Could not properly unregister job {} form the 
library cache.", jobID)
         }
       }
    -
    -  private def checkJavaVersion(): Unit = {
    -    if (System.getProperty("java.version").substring(0, 3).toDouble < 1.7) 
{
    -      log.warning("Warning: Flink is running with Java 6. " +
    -        "Java 6 is not maintained any more by Oracle or the OpenJDK 
community. " +
    -        "Flink currently supports Java 6, but may not in future releases," 
+
    -        " due to the unavailability of bug fixes security patched.")
    -    }
    -  }
     }
     
     object JobManager {
    +  
       import ExecutionMode._
    +
       val LOG = LoggerFactory.getLogger(classOf[JobManager])
    +
       val FAILURE_RETURN_CODE = 1
    +
       val JOB_MANAGER_NAME = "jobmanager"
       val EVENT_COLLECTOR_NAME = "eventcollector"
       val ARCHIVE_NAME = "archive"
       val PROFILER_NAME = "profiler"
     
       def main(args: Array[String]): Unit = {
    +
    +    // startup checks and logging
         EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager")
    -    val (configuration, executionMode, listeningAddress) = parseArgs(args)
    +    checkJavaVersion()
     
    -      if(SecurityUtils.isSecurityEnabled) {
    +    val (configuration: Configuration,
    +         executionMode: ExecutionMode,
    +         listeningAddress:  Option[(String, Int)]) =
    +    try {
    +      parseArgs(args)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        LOG.error(t.getMessage(), t)
    +        System.exit(FAILURE_RETURN_CODE)
    +        null
    --- End diff --
    
    Ah of course. But we could do something like that to avoid it:
    
    ```
    val parsedArgs = Try(parseArgs(args)).recoverWith{
          case t: Throwable =>
            new scala.util.Failure(new Exception("Could not parse the command 
line arguments", t))
        }
    
        val result = parsedArgs.map{
          case (configuration, executionMode, listeningAddress) =>
            if (SecurityUtils.isSecurityEnabled) {
              LOG.info("Security is enabled. Starting secure JobManager.")
              SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
                override def run(): Unit = {
                  runJobManager(configuration, executionMode, listeningAddress)
                }
              })
            } else {
              runJobManager(configuration, executionMode, listeningAddress)
            }
        }
    
        result.recover{
          case t: Throwable =>
            LOG.error("Failed to start JobManager.", t)
            System.exit(FAILURE_RETURN_CODE)
        }
    ```


> Improve JobManager startup robustness
> -------------------------------------
>
>                 Key: FLINK-1529
>                 URL: https://issues.apache.org/jira/browse/FLINK-1529
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.9
>
>
> Currently, the JobManager is creates asynchronously (as an actor). If its 
> initialization fails (for various reasons), the process does not terminate 
> and gives only vague log message that an actor creation failed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to