[ 
https://issues.apache.org/jira/browse/FLINK-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14318657#comment-14318657
 ] 

ASF GitHub Bot commented on FLINK-1529:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/385#discussion_r24602575
  
    --- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
    @@ -653,11 +693,87 @@ object JobManager {
         (archiveCount, profilingEnabled, cleanupInterval, executionRetries, 
delayBetweenRetries)
       }
     
    -  def startActor(configuration: Configuration)(implicit actorSystem: 
ActorSystem): ActorRef = {
    -    startActor(Props(classOf[JobManager], configuration))
    +  /**
    +   * Create the job manager members as (instanceManager, scheduler, 
libraryCacheManager,
    +   *              archiverProps, accumulatorManager, profiler, 
defaultExecutionRetries,
    +   *              delayBetweenRetries, timeout)
    +   *
    +   * @param configuration The configuration from which to parse the config 
values.
    +   * @return The members for a default JobManager.
    +   */
    +  def createJobManagerComponents(configuration: Configuration) :
    +    (InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
    +      Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, 
Int) = {
    +
    +    val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
    +
    +    val (archiveCount, profilingEnabled, cleanupInterval, 
executionRetries, delayBetweenRetries) =
    +      parseConfiguration(configuration)
    +
    +    val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
    +
    +    val profilerProps: Option[Props] = if (profilingEnabled) {
    +      Some(Props(classOf[JobManagerProfiler]))
    +    } else {
    +      None
    +    }
    +
    +    val accumulatorManager: AccumulatorManager = new 
AccumulatorManager(Math.min(1, archiveCount))
    +
    +    var blobServer: BlobServer = null
    +    var instanceManager: InstanceManager = null
    +    var scheduler: FlinkScheduler = null
    +    var libraryCacheManager: BlobLibraryCacheManager = null
    +
    +    try {
    +      blobServer = new BlobServer(configuration)
    +      instanceManager = new InstanceManager()
    +      scheduler = new FlinkScheduler()
    +      libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
    +
    +      instanceManager.addInstanceListener(scheduler)
    +    }
    +    catch {
    +      case t: Throwable => {
    +        if (libraryCacheManager != null) {
    +          libraryCacheManager.shutdown()
    +        }
    +        if (scheduler != null) {
    +          scheduler.shutdown()
    +        }
    +        if (instanceManager != null) {
    +          instanceManager.shutdown()
    +        }
    +        if (blobServer != null) {
    +          blobServer.shutdown()
    +        }
    +        throw t;
    +      }
    +    }
    +
    +    (instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
    +      profilerProps, executionRetries, delayBetweenRetries, timeout, 
archiveCount)
    +  }
    +
    +  def startActor(configuration: Configuration, actorSystem: ActorSystem): 
ActorRef = {
    +
    +    val (instanceManager, scheduler, libraryCacheManager, archiveProps, 
accumulatorManager,
    +      profilerProps, executionRetries, delayBetweenRetries,
    +      timeout, _) = createJobManagerComponents(configuration)
    +
    +    val profiler: Option[ActorRef] =
    +                 profilerProps.map( props => actorSystem.actorOf(props, 
PROFILER_NAME) )
    --- End diff --
    
    Nice functional style :-)


> Improve JobManager startup robustness
> -------------------------------------
>
>                 Key: FLINK-1529
>                 URL: https://issues.apache.org/jira/browse/FLINK-1529
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.9
>
>
> Currently, the JobManager is creates asynchronously (as an actor). If its 
> initialization fails (for various reasons), the process does not terminate 
> and gives only vague log message that an actor creation failed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to