Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5431#discussion_r168126480
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
    @@ -163,29 +210,130 @@ protected void runCluster(Configuration 
configuration) throws Exception {
                                blobServer,
                                heartbeatServices,
                                metricRegistry);
    +
    +                   // TODO: Make shutDownAndTerminate non blocking to not 
use the global executor
    +                   dispatcher.getTerminationFuture().whenCompleteAsync(
    +                           (Boolean success, Throwable throwable) -> {
    +                                   if (throwable != null) {
    +                                           LOG.info("Could not properly 
terminate the Dispatcher.", throwable);
    +                                   }
    +
    +                                   shutDownAndTerminate(
    +                                           SUCCESS_RETURN_CODE,
    +                                           ApplicationStatus.SUCCEEDED,
    +                                           true);
    +                           });
                }
        }
     
        protected void initializeServices(Configuration configuration) throws 
Exception {
    -           assert(Thread.holdsLock(lock));
     
                LOG.info("Initializing cluster services.");
     
    -           final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
    -           // TODO: Add support for port ranges
    -           final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    -
    -           commonRpcService = createRpcService(configuration, bindAddress, 
portRange);
    -           haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
    -           blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
    -           blobServer.start();
    -           heartbeatServices = createHeartbeatServices(configuration);
    -           metricRegistry = createMetricRegistry(configuration);
    -
    -           // TODO: This is a temporary hack until we have ported the 
MetricQueryService to the new RpcEndpoint
    -           // start the MetricQueryService
    -           final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
    -           metricRegistry.startQueryService(actorSystem, null);
    +           synchronized (lock) {
    +                   final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
    +                   // TODO: Add support for port ranges
    +                   final String portRange = 
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    +
    +                   commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
    +                   haServices = createHaServices(configuration, 
commonRpcService.getExecutor());
    +                   blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
    +                   blobServer.start();
    +                   heartbeatServices = 
createHeartbeatServices(configuration);
    +                   metricRegistry = createMetricRegistry(configuration);
    +
    +                   // TODO: This is a temporary hack until we have ported 
the MetricQueryService to the new RpcEndpoint
    +                   // start the MetricQueryService
    +                   final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
    +                   metricRegistry.startQueryService(actorSystem, null);
    +           }
    +   }
    +
    +   protected void startClusterComponents(
    +           Configuration configuration,
    --- End diff --
    
    nit: indentation


---

Reply via email to