tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217291912
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
@Override
public void onAddedJobGraph(final JobID jobId) {
- final CompletableFuture<SubmittedJobGraph> recoveredJob =
getRpcService().execute(
- () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
- final CompletableFuture<Acknowledge> submissionFuture =
recoveredJob.thenComposeAsync(
- (SubmittedJobGraph submittedJobGraph) ->
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
- getMainThreadExecutor());
-
- submissionFuture.whenComplete(
- (Acknowledge acknowledge, Throwable throwable) -> {
- if (throwable != null) {
- onFatalError(
- new DispatcherException(
- String.format("Could
not start the added job %s", jobId),
-
ExceptionUtils.stripCompletionException(throwable)));
+ runAsync(
+ () -> {
+ if (!jobManagerRunners.containsKey(jobId)) {
+ final CompletableFuture<JobGraph>
recoveredJob = recoveryOperation.thenApplyAsync(
+ ignored -> {
+ try {
+ return
recoverJob(jobId);
+ } catch (Exception e) {
+
ExceptionUtils.rethrow(e);
+ }
+ return null;
+ },
+ getRpcService().getExecutor());
+
+ final DispatcherId dispatcherId =
getFencingToken();
+ final CompletableFuture<Void>
submissionFuture = recoveredJob.thenComposeAsync(
+
(FunctionWithThrowable<JobGraph, CompletableFuture<Void>, Exception>) (JobGraph
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+ .thenAcceptAsync(
+
(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
Review comment:
Good point. I like this approach better. Will adapt the existing interfaces.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services