[ 
https://issues.apache.org/jira/browse/FLINK-11813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16816558#comment-16816558
 ] 

TisonKun edited comment on FLINK-11813 at 4/12/19 6:22 PM:
-----------------------------------------------------------

Thanks for you clarification and explanation [~till.rohrmann]. I like the idea 
that bound the entries of {{RunningJobsRegistry}} to the lifecycle of cluster.

For the scope of this JIRA we can say the solution could be simply keep DONE 
and never cleanup it until the end of cluster's lifecycle.

Further, I'd like to share my thoughts on, ideally, how {{Dispatcher}} 
scheduled jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}.

----------------------------------------

For the states of {{JobSchedulingStatus}}, since we always need to represent 
{{DONE}} explicitly, {{RunningJobsRegistry}} is necessary. However, we can 
involve {{SubmittedJobGraphStore}} when {{Dispatcher}} decided when and how a 
job get executed.

The underlaying cause of this JIRA is how {{Dispatcher}} schedules jobs. With 
discussion above, I'd like to restatement how {{Dispatcher}} **should** 
schedule jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}, as well as that {{JobManager}} can be decoupled with 
{{RunningJobsRegistry}}.

 1. When a job submitted for the first time, i.e., store has no graph of it and 
registry status remains {{NONE}}, the {{Dispatcher}} should in order

     # Persist the job graph in job graph store
     # Set registry status of the job to {{RUNNING}}
     # launch a {{JobManagerRunner}} and tell it to start a {{JobMaster}} on 
granted leadership. (decouple jm with registry)

       (Either 1 or 2 fails would be consider as a failed submission and 
registry entry remains {{NONE}})

2. When a job submitted with {{Dispatcher}} found that its registry status not 
{{NONE}}, reject the submission. (statement 4)

3. Whenever a {{Dispatcher}} gains leadership, it would list entries of jobs 
registry, try to recover from job store with job id that has status 
{{RUNNING}}. (With master failover, {{Dispatcher}} would tell the jm to 
reconcile the job instead of direct schedule.)

4. When a job reaches a terminal state, the Dispatcher sets the state to 
{{DONE}}.

Statements above are similar with 7 statements of [~till.rohrmann], but clarify 
how {{Dispatcher}} launch a job and emphasize we can decouple jm with registry 
because how a job get executed can be decided by {{Dispatcher}} instead of jm.

Some comments for master failover(where {{RUNNING}} comes into use)
1. A {{Dispatcher}} recovered a job with registry status {{RUNNING}} should be 
launch with a jm first try to reconcile instead of directly schedule.
2. Further {{Dispatcher}} might establish heartbeat monitor with job managers 
to detect master failover.

----------------------------------------

As [~zhuzh] there's need to unify the SubmittedJobGraphStore for session mode 
and job mode. Follow our discussions, a cluster(job or session) follows the 
process that 1. start a dispatcher 2. submit job 3. executed job 4. finish job. 
If job cluster can follow this pattern, i.e., first start a dispatcher and 
**submit** the job graph to the dispatcher. We need not a specify 
{{SingleJobSubmittedJobGraphStore}} but all 
{{highAvailabilityServices#getSubmittedJobGraphStore}}.

This is quite out of the scope here and I think efforts should be taken more on 
{{CliFrontend}}.


was (Author: tison):
Thanks for you clarification and explanation [~till.rohrmann]. I like the idea 
that bound the entries of {{RunningJobsRegistry}} to the lifecycle of cluster.

For the scope of this JIRA we can say the solution could be simply keep DONE 
and never cleanup it until the end of cluster's lifecycle.

Further, I'd like to share my thoughts on, ideally, how {{Dispatcher}} 
scheduled jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}.

----------------------------------------

For the states of {{JobSchedulingStatus}}, since we always need to represent 
{{DONE}} explicitly, {{RunningJobsRegistry}} is necessary. However, we can 
involve {{SubmittedJobGraphStore}} when {{Dispatcher}} decided when and how a 
job get executed.

The underlaying cause of this JIRA is how {{Dispatcher}} schedules jobs. With 
discussion above, I'd like to restatement how {{Dispatcher}} **should** 
schedule jobs with the help of {{SubmittedJobsGraphStore}} and 
{{RunningJobsRegistry}}, as well as that {{JobManager}} can be decoupled with 
{{RunningJobsRegistry}}.

 1. When a job submitted for the first time, i.e., store has no graph of it and 
registry status remains {{NONE}}, the {{Dispatcher}} should in order

     # Persist the job graph in job graph store
     # Set registry status of the job to {{RUNNING}}
     # launch a {{JobManagerRunner}} and tell it to start a {{JobMaster}} on 
granted leadership. (decouple jm with registry)

       (Either 1 or 2 fails would be consider as a failed submission and 
registry entry remains {{NONE}})

2. When a job submitted with {{Dispatcher}} found that its registry status not 
{{NONE}}, reject the submission. (statement 4)

3. Whenever a {{Dispatcher}} gains leadership, it would list entries of jobs 
registry, try to recover from job store with job id that has status 
{{RUNNING}}. (With master failover, {{Dispatcher}} would tell the jm to 
reconcile the job instead of direct schedule.)

4. When a job reaches a terminal state, the Dispatcher sets the state to 
{{DONE}}.

Statements above are similar with 7 statements of [~till.rohrmann], but clarify 
how {{Dispatcher}} launch a job and emphasize we can decouple jm with registry 
because how a job get executed can be decided by {{Dispatcher}} instead of jm.

Some comments for master failover(where {{RUNNING}} comes into use)
1. A {{Dispatcher}} recovered a job with registry status {{RUNNING}} should be 
launch with a jm first try to reconcile instead of directly schedule.
2. Further {{Dispatcher}} might establish heartbeat mechanism with job managers 
to detect master failover.

----------------------------------------

As [~zhuzh] there's need to unify the SubmittedJobGraphStore for session mode 
and job mode. Follow our discussions, a cluster(job or session) follows the 
process that 1. start a dispatcher 2. submit job 3. executed job 4. finish job. 
If job cluster can follow this pattern, i.e., first start a dispatcher and 
**submit** the job graph to the dispatcher. We need not a specify 
{{SingleJobSubmittedJobGraphStore}} but all 
{{highAvailabilityServices#getSubmittedJobGraphStore}}.

This is quite out of the scope here and I think efforts should be taken more on 
{{CliFrontend}}.

> Standby per job mode Dispatchers don't know job's JobSchedulingStatus
> ---------------------------------------------------------------------
>
>                 Key: FLINK-11813
>                 URL: https://issues.apache.org/jira/browse/FLINK-11813
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.4, 1.7.2, 1.8.0
>            Reporter: Till Rohrmann
>            Priority: Major
>
> At the moment, it can happen that standby {{Dispatchers}} in per job mode 
> will restart a terminated job after they gained leadership. The problem is 
> that we currently clear the {{RunningJobsRegistry}} once a job has reached a 
> globally terminal state. After the leading {{Dispatcher}} terminates, a 
> standby {{Dispatcher}} will gain leadership. Without having the information 
> from the {{RunningJobsRegistry}} it cannot tell whether the job has been 
> executed or whether the {{Dispatcher}} needs to re-execute the job. At the 
> moment, the {{Dispatcher}} will assume that there was a fault and hence 
> re-execute the job. This can lead to duplicate results.
> I think we need some way to tell standby {{Dispatchers}} that a certain job 
> has been successfully executed. One trivial solution could be to not clean up 
> the {{RunningJobsRegistry}} but then we will clutter ZooKeeper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to