[
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885860#comment-15885860
]
ASF GitHub Bot commented on FLINK-5703:
---------------------------------------
Github user zhijiangW commented on the issue:
https://github.com/apache/flink/pull/3340
@StephanEwen , thanks for professional reviews!
I attached the design doc in this jira for the implementation
consideration.
[https://docs.google.com/document/d/1rm3BYQyC-8GrLgdwSUAbkJ0aV8IaCneF6c5dk4G3SXY/edit](https://docs.google.com/document/d/1rm3BYQyC-8GrLgdwSUAbkJ0aV8IaCneF6c5dk4G3SXY/edit)
For your above suggestions:
1. I totally agree with your point of keeping the current final variables
related with **Execution** and **IntermediateResult**, etc. Although I already
tried to avoid concurrent issues for these variables in implementation, it is
still a good choice of creating the new ones to replace the previous ones, and
I am willing to re-implement the related logics for this good idea.
2. In the process of JobManagerRunner **grantLeadership**, considering not
expose the new job leader to TaskManagers before reconcile, so the
reconciliation process in JobManager will be called before
**confirmLeadership** to avoid this issue. As you suggest, it can also make
sense if creating JobManager and ExecutionGraph during reconciliation.
Considering minimum changes, I will keep the current mode for this issue if no
other reasons. What do you think?
3. Two phases of the reconciliation mainly concerns about some actions
during state transition. For example, if transition from **RECONCILING** to
**RUNNING**, it should **sendPartitionInfos** in previous logic and this action
relies on other upstream executions reconciliation. In other words, the
upstream execution reconciliation would try to cache the partition infos in the
**PartialInputChannelDeploymentDescriptor** of downstream execution. And when
the downstream execution switches to **RUNNING**, this partition info will be
sent to it. Because we can not confirm all the sequences of execution
reconciliation, dividing the whole process into two steps can make sense for
that.
But after your latest improvements for **EAGER** **ScheduleMode**, I think
the downstream task is submitted already with final upstream partition infos,
so maybe it is no need to send partition infos for running downstream task
again. If my understanding is correct, I can re-implement this part for merging
the current two phases into one based on TaskManager reports.
In addition , for FAILED or CANCELED reported execution states, it would
fail the whole ExecutionGraph as a result, so we can end the reconciliation
period immediately, then the following reports will be refused and the
corresponding tasks will be failed by TaskManagers. From JobManager side, when
process the first FAILED or CANCELED reported state to trigger failing the
ExecutionGraph, the other RECONCILING state executions will be transition to
CANCELED state directly, no need to send cancel rpc message. What do you think?
I will finish the above modifications during this week, and welcome any
other advices!
> ExecutionGraph recovery based on reconciliation with TaskManager reports
> ------------------------------------------------------------------------
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, JobManager
> Reporter: zhijiang
> Assignee: zhijiang
>
> The ExecutionGraph structure would be recovered from TaskManager reports
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp,
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer
> Execution)
> - ExecutionVertex: Map<IntermediateResultPartitionID,
> IntermediateResultPartition>
> - ExecutionGraph: ConcurrentHashMap<ExecutionAttemptID, Execution>
> For {{RECONCILING}} ExecutionState, it should be transition into any existing
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the
> TaskManger should maintain the terminal task state
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this
> mechanism in another jira. In addition, the state transition would trigger
> different actions, and some actions rely on above necessary information.
> Considering this limit, the recovery process will be divided into two steps:
> - First, recovery all other necessary information except ExecutionState.
> - Second, transition ExecutionState into real task state and trigger
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc)
> from TaskManager should be refused temporarily and responded with a special
> message by JobMaster. Then the TaskManager should retry to send these
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
> - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within
> duration time and all the tasks are in {{RUNNING}} states.
> - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
> - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within
> duration time and all the tasks are in {{FINISHED}} states.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)