Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5431#discussion_r168193217
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
---
@@ -163,29 +210,130 @@ protected void runCluster(Configuration
configuration) throws Exception {
blobServer,
heartbeatServices,
metricRegistry);
+
+ // TODO: Make shutDownAndTerminate non blocking to not
use the global executor
+ dispatcher.getTerminationFuture().whenCompleteAsync(
+ (Boolean success, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.info("Could not properly
terminate the Dispatcher.", throwable);
+ }
+
+ shutDownAndTerminate(
+ SUCCESS_RETURN_CODE,
+ ApplicationStatus.SUCCEEDED,
+ true);
+ });
}
}
protected void initializeServices(Configuration configuration) throws
Exception {
- assert(Thread.holdsLock(lock));
LOG.info("Initializing cluster services.");
- final String bindAddress =
configuration.getString(JobManagerOptions.ADDRESS);
- // TODO: Add support for port ranges
- final String portRange =
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
-
- commonRpcService = createRpcService(configuration, bindAddress,
portRange);
- haServices = createHaServices(configuration,
commonRpcService.getExecutor());
- blobServer = new BlobServer(configuration,
haServices.createBlobStore());
- blobServer.start();
- heartbeatServices = createHeartbeatServices(configuration);
- metricRegistry = createMetricRegistry(configuration);
-
- // TODO: This is a temporary hack until we have ported the
MetricQueryService to the new RpcEndpoint
- // start the MetricQueryService
- final ActorSystem actorSystem = ((AkkaRpcService)
commonRpcService).getActorSystem();
- metricRegistry.startQueryService(actorSystem, null);
+ synchronized (lock) {
+ final String bindAddress =
configuration.getString(JobManagerOptions.ADDRESS);
+ // TODO: Add support for port ranges
+ final String portRange =
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
+
+ commonRpcService = createRpcService(configuration,
bindAddress, portRange);
+ haServices = createHaServices(configuration,
commonRpcService.getExecutor());
+ blobServer = new BlobServer(configuration,
haServices.createBlobStore());
+ blobServer.start();
+ heartbeatServices =
createHeartbeatServices(configuration);
+ metricRegistry = createMetricRegistry(configuration);
+
+ // TODO: This is a temporary hack until we have ported
the MetricQueryService to the new RpcEndpoint
+ // start the MetricQueryService
+ final ActorSystem actorSystem = ((AkkaRpcService)
commonRpcService).getActorSystem();
+ metricRegistry.startQueryService(actorSystem, null);
+ }
+ }
+
+ protected void startClusterComponents(
+ Configuration configuration,
--- End diff --
Good catch. Will fix it.
---