[ https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364153#comment-16364153 ]
ASF GitHub Bot commented on FLINK-8608: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5431#discussion_r168193217 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -163,29 +210,130 @@ protected void runCluster(Configuration configuration) throws Exception { blobServer, heartbeatServices, metricRegistry); + + // TODO: Make shutDownAndTerminate non blocking to not use the global executor + dispatcher.getTerminationFuture().whenCompleteAsync( + (Boolean success, Throwable throwable) -> { + if (throwable != null) { + LOG.info("Could not properly terminate the Dispatcher.", throwable); + } + + shutDownAndTerminate( + SUCCESS_RETURN_CODE, + ApplicationStatus.SUCCEEDED, + true); + }); } } protected void initializeServices(Configuration configuration) throws Exception { - assert(Thread.holdsLock(lock)); LOG.info("Initializing cluster services."); - final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); - // TODO: Add support for port ranges - final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT)); - - commonRpcService = createRpcService(configuration, bindAddress, portRange); - haServices = createHaServices(configuration, commonRpcService.getExecutor()); - blobServer = new BlobServer(configuration, haServices.createBlobStore()); - blobServer.start(); - heartbeatServices = createHeartbeatServices(configuration); - metricRegistry = createMetricRegistry(configuration); - - // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint - // start the MetricQueryService - final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); - metricRegistry.startQueryService(actorSystem, null); + synchronized (lock) { + final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); + // TODO: Add support for port ranges + final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT)); + + commonRpcService = createRpcService(configuration, bindAddress, portRange); + haServices = createHaServices(configuration, commonRpcService.getExecutor()); + blobServer = new BlobServer(configuration, haServices.createBlobStore()); + blobServer.start(); + heartbeatServices = createHeartbeatServices(configuration); + metricRegistry = createMetricRegistry(configuration); + + // TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint + // start the MetricQueryService + final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); + metricRegistry.startQueryService(actorSystem, null); + } + } + + protected void startClusterComponents( + Configuration configuration, --- End diff -- Good catch. Will fix it. > Add MiniDispatcher for job mode > ------------------------------- > > Key: FLINK-8608 > URL: https://issues.apache.org/jira/browse/FLINK-8608 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > In order to properly support the job mode, we need a {{MiniDispatcher}} which > is started with a pre initialized {{JobGraph}} and launches a single > {{JobManagerRunner}} with this job. Once the job is completed and if the > {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should > terminate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)