Ufuk Celebi created FLINK-10439:
-----------------------------------
Summary: Race condition during job suspension
Key: FLINK-10439
URL: https://issues.apache.org/jira/browse/FLINK-10439
Project: Flink
Issue Type: Bug
Components: Distributed Coordination
Affects Versions: 1.7.0
Reporter: Ufuk Celebi
Attachments: master-logs.log, race-job-suspension.png, worker-logs.log
When a {{JobMaster}} in an HA setup looses leadership, it suspends the
execution of its job via {{JobMaster.suspend(Exception, Time)}}. This operation
involves transitioning to the {{SUSPENDING}} job state and cancelling all
running tasks. In some executions it may happen that the job does *not* reach
the terminal {{SUSPENDED}} job state.
This is due to the fact that suspending the job stops related RPC endpoints
such as the {{JobMaster}} or {{SlotPool}} (in {{JobMaster.suspend(Exception,
Time)}} and {{JobMaster.suspendExecution( Exception)}}) immediately after
suspending. Whenever this happens *before* the {{TaskExecutor}} instances have
cancelled or failed the respective tasks, the job does not transition to
{{SUSPENDED}}, because the {{ExecutionGraph}} does not receive all
{{Execution}} state transitions.
In practice, this should not happen frequently due the fact that {{JobMaster}}
and {{TaskExecutor}} instances are notified about the loss of leadership (or
loss of ZooKeeper connection or similar events) around the same time. In this
scenario, the {{TaskExecutor}} instances proactively fail the executing tasks
and notify the {{JobMaster}}. All in all, the impact of this is limited by the
fact that a new {{JobMaster}} leader will eventually recover the job.
*Steps to reproduce*:
- Start ZooKeeper
- Start a Flink cluster in HA mode and submit job
- Stop ZooKeeper
In some executions you will find that the job does not reach the terminal state
{{SUSPENDED}}. Furthermore, you may see log messages similar to the following
in this case:
{code}
The rpc endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not
been started yet. Discarding message
org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing is
started.
{code}
I've attached a logs of a local run that does not transition to {{SUSPENDED}}
and a sequence diagram of what I think may be a problematic timing.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)