[ 
https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902273#comment-15902273
 ] 

ASF GitHub Bot commented on FLINK-5971:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3488#discussion_r105067829
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 ---
    @@ -283,5 +302,31 @@ public void handleError(Exception exception) {
                                        
JobLeaderIdListener.class.getSimpleName(), exception);
                        }
                }
    +
    +           private void activateTimeout() {
    +                   if (timeoutId != null) {
    +                           cancelTimeout();
    +                   }
    +
    +                   final UUID newTimeoutId = UUID.randomUUID();
    +
    +                   timeoutId = newTimeoutId;
    +
    +                   timeoutFuture = scheduledExecutor.schedule(new 
Runnable() {
    +                           @Override
    +                           public void run() {
    +                                   
listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
    --- End diff --
    
    I think this thread is different with the leader retrieve thread which will 
call "notifyLeaderAddress", right?
    So is this possible that when a job is close to timeout, and then comes the 
notification of leader address, both threads are trying to `cancelTimeout`, NPE 
will be thrown if one of them trying to call `timeoutFuture.cancel(true)`, 
while the other one has set `timeoutFuture = null`


> JobLeaderIdService should time out registered jobs
> --------------------------------------------------
>
>                 Key: FLINK-5971
>                 URL: https://issues.apache.org/jira/browse/FLINK-5971
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.3.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>
> The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the 
> moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic 
> answer.
> We should remove the {{RunningJobsRegistry}} and register instead a timeout 
> for each job which does not have a job leader associated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to