[ 
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885860#comment-15885860
 ] 

ASF GitHub Bot commented on FLINK-5703:
---------------------------------------

Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/3340
  
    @StephanEwen , thanks for professional reviews!
    
    I attached the design doc in this jira for the implementation 
consideration. 
    
[https://docs.google.com/document/d/1rm3BYQyC-8GrLgdwSUAbkJ0aV8IaCneF6c5dk4G3SXY/edit](https://docs.google.com/document/d/1rm3BYQyC-8GrLgdwSUAbkJ0aV8IaCneF6c5dk4G3SXY/edit)
    
    For your above suggestions:
    
    1.  I totally agree with your point of keeping the current final variables 
related with **Execution** and **IntermediateResult**, etc. Although I already 
tried to avoid concurrent issues for these variables in implementation, it is 
still a good choice of creating the new ones to replace the previous ones, and 
I am willing to re-implement the related logics for this good idea.
    
    2. In the process of JobManagerRunner **grantLeadership**, considering not 
expose the new job leader to TaskManagers before reconcile, so the 
reconciliation process in JobManager will be called before 
**confirmLeadership** to avoid this issue. As you suggest, it can also make 
sense if creating JobManager and ExecutionGraph during reconciliation. 
Considering minimum changes, I will keep the current mode for this issue if no 
other reasons. What do you think?
    
    3. Two phases of the reconciliation mainly concerns about some actions 
during state transition. For example, if transition from **RECONCILING** to 
**RUNNING**, it should **sendPartitionInfos** in previous logic and this action 
relies on other upstream executions reconciliation. In other words, the 
upstream execution reconciliation would try to cache the partition infos in the 
**PartialInputChannelDeploymentDescriptor** of downstream execution. And when 
the downstream execution switches to **RUNNING**, this partition info will be 
sent to it. Because we can not confirm all the sequences of execution 
reconciliation, dividing the whole process into two steps can make sense for 
that. 
    But after your latest improvements for **EAGER** **ScheduleMode**, I think 
the downstream task is submitted already with final upstream partition infos, 
so maybe it is no need to send partition infos for running downstream task 
again. If my understanding is correct, I can re-implement this part for merging 
the current two phases into one based on TaskManager reports.
    In addition , for FAILED or CANCELED reported execution states, it would 
fail the whole ExecutionGraph as a result, so we can end the reconciliation 
period immediately, then the following reports will be refused and the 
corresponding tasks will be failed by TaskManagers. From JobManager side, when 
process the first FAILED or CANCELED reported state to trigger failing the 
ExecutionGraph, the other RECONCILING state executions will be transition to 
CANCELED state directly, no need to send cancel rpc message. What do you think?
    
    I will finish the above modifications during this week, and welcome any 
other advices! 


> ExecutionGraph recovery based on reconciliation with TaskManager reports
> ------------------------------------------------------------------------
>
>                 Key: FLINK-5703
>                 URL: https://issues.apache.org/jira/browse/FLINK-5703
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, JobManager
>            Reporter: zhijiang
>            Assignee: zhijiang
>
> The ExecutionGraph structure would be recovered from TaskManager reports 
> during reconciling period, and the necessary information includes:
>     - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, 
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer 
> Execution)
>     - ExecutionVertex: Map<IntermediateResultPartitionID, 
> IntermediateResultPartition>
>     - ExecutionGraph: ConcurrentHashMap<ExecutionAttemptID, Execution>
> For {{RECONCILING}} ExecutionState, it should be transition into any existing 
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the 
> TaskManger should maintain the terminal task state 
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this 
> mechanism in another jira. In addition, the state transition would trigger 
> different actions, and some actions rely on above necessary information. 
> Considering this limit, the recovery process will be divided into two steps:
>     - First, recovery all other necessary information except ExecutionState.
>     - Second, transition ExecutionState into real task state and trigger 
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC 
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc) 
> from TaskManager should be refused temporarily and responded with a special 
> message by JobMaster. Then the TaskManager should retry to send these 
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states 
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
>     - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within 
> duration time and all the tasks are in {{RUNNING}} states.
>     - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report 
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
>     - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within 
> duration time and all the tasks are in {{FINISHED}} states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to