[
https://issues.apache.org/jira/browse/FLINK-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363722#comment-16363722
]
ASF GitHub Bot commented on FLINK-8608:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5431#discussion_r168126480
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
---
@@ -163,29 +210,130 @@ protected void runCluster(Configuration
configuration) throws Exception {
blobServer,
heartbeatServices,
metricRegistry);
+
+ // TODO: Make shutDownAndTerminate non blocking to not
use the global executor
+ dispatcher.getTerminationFuture().whenCompleteAsync(
+ (Boolean success, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.info("Could not properly
terminate the Dispatcher.", throwable);
+ }
+
+ shutDownAndTerminate(
+ SUCCESS_RETURN_CODE,
+ ApplicationStatus.SUCCEEDED,
+ true);
+ });
}
}
protected void initializeServices(Configuration configuration) throws
Exception {
- assert(Thread.holdsLock(lock));
LOG.info("Initializing cluster services.");
- final String bindAddress =
configuration.getString(JobManagerOptions.ADDRESS);
- // TODO: Add support for port ranges
- final String portRange =
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
-
- commonRpcService = createRpcService(configuration, bindAddress,
portRange);
- haServices = createHaServices(configuration,
commonRpcService.getExecutor());
- blobServer = new BlobServer(configuration,
haServices.createBlobStore());
- blobServer.start();
- heartbeatServices = createHeartbeatServices(configuration);
- metricRegistry = createMetricRegistry(configuration);
-
- // TODO: This is a temporary hack until we have ported the
MetricQueryService to the new RpcEndpoint
- // start the MetricQueryService
- final ActorSystem actorSystem = ((AkkaRpcService)
commonRpcService).getActorSystem();
- metricRegistry.startQueryService(actorSystem, null);
+ synchronized (lock) {
+ final String bindAddress =
configuration.getString(JobManagerOptions.ADDRESS);
+ // TODO: Add support for port ranges
+ final String portRange =
String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
+
+ commonRpcService = createRpcService(configuration,
bindAddress, portRange);
+ haServices = createHaServices(configuration,
commonRpcService.getExecutor());
+ blobServer = new BlobServer(configuration,
haServices.createBlobStore());
+ blobServer.start();
+ heartbeatServices =
createHeartbeatServices(configuration);
+ metricRegistry = createMetricRegistry(configuration);
+
+ // TODO: This is a temporary hack until we have ported
the MetricQueryService to the new RpcEndpoint
+ // start the MetricQueryService
+ final ActorSystem actorSystem = ((AkkaRpcService)
commonRpcService).getActorSystem();
+ metricRegistry.startQueryService(actorSystem, null);
+ }
+ }
+
+ protected void startClusterComponents(
+ Configuration configuration,
--- End diff --
nit: indentation
> Add MiniDispatcher for job mode
> -------------------------------
>
> Key: FLINK-8608
> URL: https://issues.apache.org/jira/browse/FLINK-8608
> Project: Flink
> Issue Type: New Feature
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to properly support the job mode, we need a {{MiniDispatcher}} which
> is started with a pre initialized {{JobGraph}} and launches a single
> {{JobManagerRunner}} with this job. Once the job is completed and if the
> {{MiniDispatcher}} is running in detached mode, the {{MiniDispatcher}} should
> terminate.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)