[ https://issues.apache.org/jira/browse/FLINK-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14318657#comment-14318657 ]
ASF GitHub Bot commented on FLINK-1529: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/385#discussion_r24602575 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -653,11 +693,87 @@ object JobManager { (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) } - def startActor(configuration: Configuration)(implicit actorSystem: ActorSystem): ActorRef = { - startActor(Props(classOf[JobManager], configuration)) + /** + * Create the job manager members as (instanceManager, scheduler, libraryCacheManager, + * archiverProps, accumulatorManager, profiler, defaultExecutionRetries, + * delayBetweenRetries, timeout) + * + * @param configuration The configuration from which to parse the config values. + * @return The members for a default JobManager. + */ + def createJobManagerComponents(configuration: Configuration) : + (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, + Props, AccumulatorManager, Option[Props], Int, Long, FiniteDuration, Int) = { + + val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) + + val (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries) = + parseConfiguration(configuration) + + val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount) + + val profilerProps: Option[Props] = if (profilingEnabled) { + Some(Props(classOf[JobManagerProfiler])) + } else { + None + } + + val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount)) + + var blobServer: BlobServer = null + var instanceManager: InstanceManager = null + var scheduler: FlinkScheduler = null + var libraryCacheManager: BlobLibraryCacheManager = null + + try { + blobServer = new BlobServer(configuration) + instanceManager = new InstanceManager() + scheduler = new FlinkScheduler() + libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) + + instanceManager.addInstanceListener(scheduler) + } + catch { + case t: Throwable => { + if (libraryCacheManager != null) { + libraryCacheManager.shutdown() + } + if (scheduler != null) { + scheduler.shutdown() + } + if (instanceManager != null) { + instanceManager.shutdown() + } + if (blobServer != null) { + blobServer.shutdown() + } + throw t; + } + } + + (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, + profilerProps, executionRetries, delayBetweenRetries, timeout, archiveCount) + } + + def startActor(configuration: Configuration, actorSystem: ActorSystem): ActorRef = { + + val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, + profilerProps, executionRetries, delayBetweenRetries, + timeout, _) = createJobManagerComponents(configuration) + + val profiler: Option[ActorRef] = + profilerProps.map( props => actorSystem.actorOf(props, PROFILER_NAME) ) --- End diff -- Nice functional style :-) > Improve JobManager startup robustness > ------------------------------------- > > Key: FLINK-1529 > URL: https://issues.apache.org/jira/browse/FLINK-1529 > Project: Flink > Issue Type: Improvement > Components: JobManager > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 0.9 > > > Currently, the JobManager is creates asynchronously (as an actor). If its > initialization fails (for various reasons), the process does not terminate > and gives only vague log message that an actor creation failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)