[
https://issues.apache.org/jira/browse/FLINK-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883298#comment-15883298
]
ASF GitHub Bot commented on FLINK-5703:
---------------------------------------
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/3340
There is a lot of good code in this PR.
What I would suggest to make different is to NOT make `Execution`,
`IntermediateResult` partition, etc mutable. There is a big benefit to having
them immutable, both for safety against accidental bugs, and also from a
concurrency standpoint. Regular mutable non-volatile variables have visibility
problems in concurrent access, which leads to very subtle bugs that are hard to
trace and recover:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#MemoryVisibility
For `Execution`, I would change it such that the `ExecutionVertex` swaps
the current `Execution` (in state `RECONCILING`) with the a new `Execution`
that has all the fields properly set. That way there is a quasi atomic
replacement of the previous execution, which is safer and easier to reason
about.
It may be worth to even create the JobManager (from the JobManagerRunner)
and the ExecutionGraph directly in reconciliation mode. That way, you don't
have to worry what to do with TaskManagers that register while the JobMaster
has not yet switched to reconciliation.
Can you explain a bit why the reconciliation need the two phases:
1. Receive reports, set the execution attempt and state
2. Once all are there, transition the state to running
Could it be done such that on report the `Execution` is directly set to the
reported state? Then once all reports are there, everything is just fine. After
the reconciliation timeout expires, you would just iterate over all executions
and `fail()` the ones that are still in `RECONCILE` - recovery logic takes care
of the rest.
That may also handle the case where some already reconciled tasks fail
while reconciliation still happens. The ExecutionGraph would already recover
the local tasks that are affected by the failure.
> ExecutionGraph recovery based on reconciliation with TaskManager reports
> ------------------------------------------------------------------------
>
> Key: FLINK-5703
> URL: https://issues.apache.org/jira/browse/FLINK-5703
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, JobManager
> Reporter: zhijiang
> Assignee: zhijiang
>
> The ExecutionGraph structure would be recovered from TaskManager reports
> during reconciling period, and the necessary information includes:
> - Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp,
> ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer
> Execution)
> - ExecutionVertex: Map<IntermediateResultPartitionID,
> IntermediateResultPartition>
> - ExecutionGraph: ConcurrentHashMap<ExecutionAttemptID, Execution>
> For {{RECONCILING}} ExecutionState, it should be transition into any existing
> task states ({{RUNNING}},{{CANCELED}},{{FAILED}},{{FINISHED}}). To do so, the
> TaskManger should maintain the terminal task state
> ({{CANCELED}},{{FAILED}},{{FINISHED}}) for a while and we try to realize this
> mechanism in another jira. In addition, the state transition would trigger
> different actions, and some actions rely on above necessary information.
> Considering this limit, the recovery process will be divided into two steps:
> - First, recovery all other necessary information except ExecutionState.
> - Second, transition ExecutionState into real task state and trigger
> actions. The behavior is the same with current {{UpdateTaskExecutorState}}.
> To make logic easy and consistency, during recovery period, all the other RPC
> messages ({{UpdateTaskExecutionState}}, {{ScheduleOrUpdateConsumers}},etc)
> from TaskManager should be refused temporarily and responded with a special
> message by JobMaster. Then the TaskManager should retry to send these
> messages later until JobManager ends recovery and acknowledgement.
> For {{RECONCILING}} JobStatus, it would be transition into one of the states
> ({{RUNNING}},{{FAILING}},{{FINISHED}}) after recovery.
> - {{RECONCILING}} to {{RUNNING}}: All the TaskManager report within
> duration time and all the tasks are in {{RUNNING}} states.
> - {{RECONCILING}} to {{FAILING}}: One of the TaskManager does not report
> in time, or one of the tasks state is in {{FAILED}} or {{CANCELED}}
> - {{RECONCILING}} to {{FINISHED}}: All the TaskManger report within
> duration time and all the tasks are in {{FINISHED}} states.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)