[
https://issues.apache.org/jira/browse/FLINK-17075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143818#comment-17143818
]
Chesnay Schepler commented on FLINK-17075:
------------------------------------------
h3. Problem description:
The {{JobMaster}} keeps track of the state of all {{Executions}} belonging to a
job. After deployment, this tracking relies on updates from the
{{TaskExecutors}}, transmitted via dedicated RPC messages.
If one such message is lost then the tracked state may no longer match the
actual one. In the worst case this prevents an {{Execution}} from ever reaching
a terminal state, which in turn prevents the job from terminating.
h3. Proposed solution:
To prevent the worst case from happening, we propose that the {{TaskExecutor}}
also submits a report of all currently deployed {{Tasks}} (identified by the
{{ExecutionAttemptID}}) with each heartbeat. This allows us to detect
discrepancies between the set of executions of the {{JobManager}} and
{{TaskExecutor}}, and act accordingly.
This in the end boils down to a comparison of 2 {{Set<ExecutionAttemptID>}}.
If an execution exists only in the {{JobMaster}} set, then the execution was
dropped by the {{TaskExecutor}}.
This could imply a loss of a terminal state transition.
We cannot determine which terminal state the task has reached, since all
information was already cleaned up.
In this case we will fail the execution in the {{ExecutionGraph}}, typically
resulting in a restart.
If an execution exists only in the {{TaskExecutor}} set, then some leftover
task from a previous attempt is still running on the {{TaskExecutor}}.
In this case we will cancel the task on the {{TaskExecutor}}. Running jobs are
unaffected.
If an execution exists in both sets, then we don't do anything.
h4. Required changes:
{{TaskExecutor}}
------------------------------
The existing {{TaskSlotTable}} supports iterating over all {{Tasks}} for a
given {{JobID}}, allowing us to extract the {{ExecutionAttemptID}}.
>From this we generate a {{Set<ExecutionAttemptID>}}, and submit it via
>heartbeats.
{{JobMaster}}
------------------------------
Here we need to be able to:
a) (un)track actually deployed {{Executions}}
c) cancel tasks on the {{TaskExecutor}}
d) fail tasks in the {{ExecutionGraph}}
These capabilities are split across 2 new components:
1) ExecutionDeploymentTracker
2) ExecutionDeploymentReconciler
1) The tracker lives in the Scheduler, with the following interface:
{code}
public interface ExecutionDeploymentTracker {
void startTrackingDeployment(ExecutionAttemptID deployment);
void stopTrackingDeployment(ExecutionAttemptID deployment);
Set<ExecutionAttemptID> getExecutions();
{code}
It's basically a {{Set<ExecutionAttemptID>}}.
The tracker is notified by the {{ExecutionGraph}} of deployed/finished
executions through 2 new listeners:
{code}
public interface ExecutionDeploymentListener {
void onCompletedDeployment(ExecutionAttemptID execution);
}
public interface ExecutionStateUpdateListener {
void onStateUpdate(ExecutionAttemptID execution, ExecutionState
newState);
}
{code}
{{onCompletedDeployment}} is called in {{Execution#deploy}} when the deployment
future completes; an implementation will initiate the tracking.
{{onStateUpdate}} is called in {{Execution#transitionState}} on any successful
state transition, an implementation will stop the tracking if the new state is
a terminal one.
Note: The deployment listener is required since there is no dedicated state for
a deployed task;
executions are switched to DEPLOYING, submitted to the {{TaskExecutor}},
and switched to running after an update from the {{TaskExecutor}}.
Since this update can be lost we cannot rely on it.
A dedicated DEPLOYED state would be preferable, but this would require too many
changes to the {{ExecutionGraph}} at this time.
2) The reconciler lives in the {{JobMaster}} and uses the IDs provided by the
tracker and {{TaskExecutor}} heartbeats to detect mismatches, and fire events
accordingly.
By defining a {{ReconciliationHandler}} the {{JobMaster}} can decide how each
case should be handled:
{code}
public interface ExecutionDeploymentReconciler {
// conceptual factory interface
interface Factory {
ExecutionDeploymentStateReconciler get(ReconciliationHandler
trigger);
}
void reconcileExecutionStates(ResourceID origin, DeploymentReport
deploymentReport, Set<ExecutionAttemptID> knownExecutionAttemptsIds);
interface ExecutionIdsProvider {
Set<ExecutionAttemptID> getExecutions();
}
interface ReconciliationHandler {
// fail the execution in the ExecutionGraph
void onMissingDeployment(ExecutionAttemptID deployment);
// cancel the task on the TaskExecutor
void onUnknownDeployment(ExecutionAttemptID deployment,
ResourceID hostingTaskExecutor);
}
{code}
> Add task status reconciliation between TM and JM
> ------------------------------------------------
>
> Key: FLINK-17075
> URL: https://issues.apache.org/jira/browse/FLINK-17075
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Till Rohrmann
> Assignee: Chesnay Schepler
> Priority: Critical
> Fix For: 1.10.2, 1.12.0, 1.11.1
>
>
> In order to harden the TM and JM communication I suggest to let the
> {{TaskExecutor}} send the task statuses back to the {{JobMaster}} as part of
> the heartbeat payload (similar to FLINK-11059). This would allow to reconcile
> the states of both components in case that a status update message was lost
> as described by a user on the ML.
> https://lists.apache.org/thread.html/ra9ed70866381f0ef0f4779633346722ccab3dc0d6dbacce04080b74e%40%3Cuser.flink.apache.org%3E
--
This message was sent by Atlassian Jira
(v8.3.4#803005)