[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612349#comment-16612349
 ] 

ASF GitHub Bot commented on FLINK-10255:
----------------------------------------

GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##########
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
        @Override
        public void onAddedJobGraph(final JobID jobId) {
-               final CompletableFuture<SubmittedJobGraph> recoveredJob = 
getRpcService().execute(
-                       () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-               final CompletableFuture<Acknowledge> submissionFuture = 
recoveredJob.thenComposeAsync(
-                       (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-                       getMainThreadExecutor());
-
-               submissionFuture.whenComplete(
-                       (Acknowledge acknowledge, Throwable throwable) -> {
-                               if (throwable != null) {
-                                       onFatalError(
-                                               new DispatcherException(
-                                                       String.format("Could 
not start the added job %s", jobId),
-                                                       
ExceptionUtils.stripCompletionException(throwable)));
+               runAsync(
+                       () -> {
+                               if (!jobManagerRunners.containsKey(jobId)) {
+                                       final CompletableFuture<JobGraph> 
recoveredJob = recoveryOperation.thenApplyAsync(
+                                               ignored -> {
+                                                       try {
+                                                               return 
recoverJob(jobId);
+                                                       } catch (Exception e) {
+                                                               
ExceptionUtils.rethrow(e);
+                                                       }
+                                                       return null;
+                                               },
+                                               getRpcService().getExecutor());
+
+                                       final DispatcherId dispatcherId = 
getFencingToken();
+                                       final CompletableFuture<Void> 
submissionFuture = recoveredJob.thenComposeAsync(
+                                               
(FunctionWithThrowable<JobGraph, CompletableFuture<Void>, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+                                                       .thenAcceptAsync(
+                                                               
(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The code would be much more readible with 
static factory methods:
   
   ```
   
   /**
    * {@link Consumer} that can throw checked exceptions.
    */
   @FunctionalInterface
   public interface CheckedConsumer<T> {
   
        void checkedAccept(T t) throws Exception;
   
        static <T> Consumer<T> unchecked(CheckedConsumer<T> checkedConsumer) {
                return (t) -> {
                        try {
                                checkedConsumer.checkedAccept(t);
                        } catch (Exception e) {
                                ExceptionUtils.rethrow(e);
                        }
                };
        }
   }
   ```
   This allows for:
   ```
   CheckedConsumer.unchecked(isRecoveredJobRunning -> {
        ...
   });
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what exact type of exception can be thrown – what matters is that the 
checked exception becomes a unchecked. We do not need to generify the exception 
type in `ConsumerWithException`. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> --------------------------------------------
>
>                 Key: FLINK-10255
>                 URL: https://issues.apache.org/jira/browse/FLINK-10255
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to