Elias Levy created FLINK-10011:
----------------------------------
Summary: Old job resurrected during HA failover
Key: FLINK-10011
URL: https://issues.apache.org/jira/browse/FLINK-10011
Project: Flink
Issue Type: Bug
Components: JobManager
Affects Versions: 1.4.2
Reporter: Elias Levy
For the second time we've observed Flink resurrect an old job during JobManager
high-availability fail over.
h4. Configuration
* AWS environment
* Flink 1.4.2 standalong cluster in HA mode
* 2 JMs, 3 TMs
* 3 node ZK ensemble
* 1 job consuming to/from Kafka
* Checkpoints in S3 using the Presto file system adaptor
h4. Timeline
* 15:18:10 JM 2 completes checkpoint 69256.
* 15:19:10 JM 2 completes checkpoint 69257.
* 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a
SocketTimeoutException
* 15:19:57 ZK 1 closes connection to JM 2 (leader)
* 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
* 15:19:57 JM 2 reports it can't read data from ZK
** {{Unable to read additional data from server sessionid 0x30000003f4a0003,
likely server has closed socket, closing socket connection and attempting
reconnect)}}
** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
* 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.}}
** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs
are not monitored (temporarily).}}
** {{Connection to ZooKeeper suspended. The contender
akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the
leader election}}{{ }}
** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.}}
* 15:19:57 JM 2 gives up leadership
** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}}
* 15:19:57 JM 2 changes job
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
** {{Stopping checkpoint coordinator for job
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
* 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages
because there is no leader
** {{Discard message
LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
TaskManager akka://flink/user/taskmanager is disassociating)) because there is
currently no valid leader id known.}}
* 15:19:57 JM 2 connects to ZK 2 and renews its session
** {{Opening socket connection to server
ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
** {{Socket connection established to
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
** {{Connection to ZooKeeper was reconnected. Leader retrieval can be
restarted.}}
** {{Session establishment complete on server
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid =
0x30000003f4a0003, negotiated timeout = 40000}}
** {{Connection to ZooKeeper was reconnected. Leader election can be
restarted.}}
** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are
monitored again.}}
** {{State change: RECONNECTED}}
* 15:19:57: JM 1 reports JM 1 has been granted leadership:
** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted
leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
* 15:19:57 JM 2 reports the job has been suspended
** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting
down.}}
** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
* 15:19:57 JM 2 reports it has lost leadership:
** {{Associated JobManager
Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
** {{Received leader address but not running in leader ActorSystem. Cancelling
registration.}}
* 15:19:57 TMs register with JM 1
* 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
** {{Attempting to recover all jobs.}}
** {{There are 2 jobs to recover. Starting the job recovery.}}
** {{Attempting to recover job
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
** {{Attempting to recover job
{color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
* 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
** {{Got user-level KeeperException when processing
sessionid:0x2000001d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1
reqpath:n/a Error
Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
Error:KeeperErrorCode = NodeExists for
/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
** {{Got user-level KeeperException when processing
sessionid:0x2000001d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1
reqpath:n/a Error
Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2
Error:KeeperErrorCode = NodeExists for
/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}}
* 15:29:08 JM 1 starts to recover job
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1,
JobInfo(clients:
Set((Actor[akka.tcp://[email protected]:37564/temp/$c],DETACHED)),
start: 1528833686265)).}}
** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}}
** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}}
** {{Using restart strategy
FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}}
** {{Successfully ran initialization on master in 0 ms.}}
** {{Job recovers via failover strategy: full graph restart}}
** {{Running initialization on master for job Some Job
(2a4eff355aef849c5ca37dbac04f2ff1).}}
** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}}
** {{Using application-defined state backend for checkpoint/savepoint
metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
** {{Persisting periodic checkpoints externally at
s3://bucket/flink/cluster_1/checkpoints-external.}}
** {{Recovering checkpoints from ZooKeeper.}}
** {{Found 3 checkpoints in ZooKeeper.}}
** {{Trying to retrieve checkpoint 69255.}}
** {{Trying to fetch 3 checkpoints from storage.}}
** {{Trying to retrieve checkpoint 69256.}}
** {{Trying to retrieve checkpoint 69257.}}
** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882
for 2a4eff355aef849c5ca37dbac04f2ff1.}}
** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}}
** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
CREATED to RUNNING.}}
** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
RUNNING to FAILING.}}{{
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not
enough free slots available to run the job. You can decrease the operator
parallelism or increase the number of slots per TaskManager in the
configuration.}}
* 15:20:09 JM 1 starts to recover
{color:#d04437}61bca496065cd05e4263070a5e923a05{color}
** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05,
JobInfo(clients:
Set((Actor[akka.tcp://[email protected]:41001/temp/$c],DETACHED)),
start: 1525728377900)).}}
** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}}
** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}}
** {{Using restart strategy
FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}}
** {{Job recovers via failover strategy: full graph restart}}
** {{Successfully ran initialization on master in 0 ms.}}
** {{Running initialization on master for job Some Job
(61bca496065cd05e4263070a5e923a05).}}
** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}}
** {{Using application-defined state backend for checkpoint/savepoint
metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
** {{Persisting periodic checkpoints externally at
s3://bucket/flink/cluster_1/checkpoints-external.}}
** {{Recovering checkpoints from ZooKeeper.}}
** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}}
** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state
CREATED to RUNNING.}}
** {{Trying to fetch 0 checkpoints from storage}}
** {{Found 0 checkpoints in ZooKeeper.}}
* 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
** {{Name collision: Group already contains a Metric with the name
'lastCheckpointSize'. Metric will not be reported.[jobmanager, job]}}
** {{Name collision: Group already contains a Metric with the name
'lastCheckpointAlignmentBuffered'. Metric will not be reported.[jobmanager,
job]}}
** etc
* 15:20:39 JM 1 begins attempting to restart the
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job repeatedly
** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
FAILING to RESTARTING.}}
** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
RESTARTING to CREATED.}}
** {{Recovering checkpoints from ZooKeeper.}}
** {{Found 3 checkpoints in ZooKeeper.}}
** {{Trying to fetch 3 checkpoints from storage.}}
** {{Trying to retrieve checkpoint 69255.}}
** {{Trying to retrieve checkpoint 69256.}}
** {{Trying to retrieve checkpoint 69257.}}
** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882
for 2a4eff355aef849c5ca37dbac04f2ff1.}}
** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
CREATED to RUNNING.}}
** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
RUNNING to FAILING.}}
* 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a follower
h4. Analysis
The cluster was running job
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. The JM HA leader was
JM 2, which was connected to ZK 1. ZK 1 was a follower in the ZK ensemble.
The ZK leader was ZK 2.
ZK 1 lost network connectivity for about 16 minutes. Upon loss of connectivity
to ZK 1, JM 2 gives up leadership and shutdown the
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job. JM 2 then
immediately connects to ZK 2, without its ZK session having expired.
Nonetheless, as it gave up leadership JM 1 is elected the new JM leader.
JM 1 then erroneously decides there are two jobs to restore. The previously
running job, {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and
{color:#d04437}61bca496065cd05e4263070a5e923a05{color}. JM 1 decides there are
no checkpoints for {color:#d04437}61bca496065cd05e4263070a5e923a05{color},
which starts right away. JM 1 attempts to restore
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} from the latest
checkpoint, but it fails because there aren't enough task slots in the cluster
as a result of the other job starting. Thus,
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} entered a restart loop.
We manually stopped both jobs and starts a new job based on the last known
checkpoint for {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color} is an old job that
we ran for a month between May 7th and June 5th.
After our manual intervention, the the HA state directory in S3 looks like this:
{{s3cmd ls s3://bucket/flink/cluster_1/recovery/}}
{{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
{{2018-07-31 17:33 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}}
{{2018-07-31 17:34 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}}
{{2018-07-31 17:32 35553
s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}}
{{2018-06-12 20:01 284626
s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}}
{{2018-07-30 23:01 285257
s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}}
submittedJobGraph7f627a661cec appears to be job
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, the long running job
that failed during the ZK failover
submittedJobGraphf3767780c00c appears to be job
{color:#205081}d77948df92813a68ea6dfd6783f40e7e{color}, the job we started
restoring from a checkpoint after shutting down the duplicate jobs
A few questions come to mind.
h5. Why does the JM terminate running jobs when it can immediately connect to
another ZK node and its ZK session has not expired?
This seems to be a result of using the LeaderLatch recipe in Curator. It's
[docs|https://github.com/Netflix/curator/wiki/Leader-Latch] state:
{quote}LeaderLatch instances add a ConnectionStateListener to watch for
connection problems. If SUSPENDED or LOST is reported, the LeaderLatch that is
the *leader will report that it is no longer the leader* (i.e. there will not
be a leader until the connection is re-established). If a LOST connection is
RECONNECTED, the LeaderLatch *will delete its previous ZNode and create a new
one*.
Users of LeaderLatch must take account that connection issues can cause
leadership to be lost. i.e. hasLeadership() returns true but some time later
the connection is SUSPENDED or LOST. At that point hasLeadership() will return
false. It is highly recommended that LeaderLatch users register a
ConnectionStateListener.
{quote}
So not only is leadership lost while disconnected, but will likely stay lost
when reconnecting as a result of the znode deletion and recreation.
This can probably be solved by using the Curator LeaderElection recipe instead,
which [states|https://github.com/Netflix/curator/wiki/Leader-Election]:
{quote}The {{LeaderSelectorListener}} class extends
{{ConnectionStateListener}}. When the LeaderSelector is started, it adds the
listener to the Curator instance. Users of the {{LeaderSelector}} must pay
attention to any connection state changes. If an instance becomes the leader,
it should respond to notification of being SUSPENDED or LOST.
If the SUSPENDED state is reported, the instance must assume that it might no
longer be the leader until it receives a RECONNECTED state. If the LOST state
is reported, the instance is no longer the leader and its {{takeLeadership}}
method should exit.
{quote}
So with LeaderElection it seems that what to do during the SUSPENDED state is
left up to the application, which may choose to continue being leader until the
state becomes LOST.
Obviously there are dangers with doing so, but at the very least you would
expect the JM not to give up leadership until it tried to connect to other ZK
members within the remaining ZK session timeout.
This problem has been [previously
discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html]
in the mailing list, which led to FLINK-6174 and this
[PR|https://github.com/apache/flink/pull/3599], which appears to be a
modification of the Curator LeaderLatch recipe. It also lead to FLINK-5703,
which seems like an attempt to keep jobs running during JM failover. While
that is a valuable addition, I argue that is not required to avoid this
failure, as the previous leader can continue being leader so long as it
connects to a new ZK before its ZK session expires.
h5. Why did JM 1 resurrect the old job?
Something must have been off with the state stored in the S3 HA recovery
directory. The job must have not been fully removed.
In fact, right now the recovery directory has the file
submittedJobGraph7f627a661cec which appears to be job
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected? That
job is no longer running. Shouldn't that file no longer exist in the recovery
directory?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)