tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r475556728
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -427,21 +389,39 @@ private JobManagerRunner
startJobManagerRunner(JobManagerRunner jobManagerRunner
}
}
} else {
- log.debug("There is a newer
JobManagerRunner for the job {}.", jobId);
+ log.debug("Job {} is not
registered anymore at dispatcher", jobId);
}
-
return null;
}, getMainThreadExecutor()));
+ }
- jobManagerRunner.start();
-
- return jobManagerRunner;
+ CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph
jobGraph) {
+ final RpcService rpcService = getRpcService();
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ JobManagerRunner runner =
jobManagerRunnerFactory.createJobManagerRunner(
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ jobManagerSharedServices,
+ new
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+ fatalErrorHandler);
+ runner.start();
+ return runner;
+ } catch (Exception e) {
+ throw new CompletionException(new
JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.",
e));
+ }
+ },
+ rpcService.getExecutor()); // do not use main thread
executor. Otherwise, Dispatcher is blocked on JobManager creation
Review comment:
Great, thanks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]