[
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564569#comment-16564569
]
Elias Levy edited comment on FLINK-10011 at 8/1/18 12:48 AM:
-------------------------------------------------------------
It appears that it may not be necessary to replace the {{LeaderLatch}} Curator
recipe to avoid loosing leadership during temporary connection failures.
The Curator error handling
[documentation|https://curator.apache.org/errors.html] talks about a
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}}
connection states differently. And this
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
shows how leadership is not lost with a {{LeaderLatch}} and that policy. The
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
implementing the policy. And [this
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
that Curator will inject a session expiration even while it is in
{{SUSPENDED}} state, so that a disconnected client won't continue to think it
is leader past its session expiration.
was (Author: elevy):
It appears that it may not be necessary to replace the {{LeaderLatch}} Curator
recipe to avoid loosing leadership during temporary connection failures.
The Curator error handling
[documentation|https://curator.apache.org/errors.html] talks about a
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}}
connection states differently. And this
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
shows how leadership is not lost with a {{LeaderLatch}}}} and that policy.
The
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
implementing the policy. And [this
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
that Curator will inject a session expiration even while it is in
{{SUSPENDED}} state, so that a disconnected client won't continue to think it
is leader past its session expiration.
> Old job resurrected during HA failover
> --------------------------------------
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
> Issue Type: Bug
> Components: JobManager
> Affects Versions: 1.4.2
> Reporter: Elias Levy
> Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during
> JobManager high-availability fail over.
> h4. Configuration
> * AWS environment
> * Flink 1.4.2 standalong cluster in HA mode
> * 2 JMs, 3 TMs
> * 3 node ZK ensemble
> * 1 job consuming to/from Kafka
> * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline
> * 15:18:10 JM 2 completes checkpoint 69256.
> * 15:19:10 JM 2 completes checkpoint 69257.
> * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a
> SocketTimeoutException
> * 15:19:57 ZK 1 closes connection to JM 2 (leader)
> * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK
> 1
> * 15:19:57 JM 2 reports it can't read data from ZK
> ** {{Unable to read additional data from server sessionid 0x30000003f4a0003,
> likely server has closed socket, closing socket connection and attempting
> reconnect)}}
> ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
> * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
> ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader
> from ZooKeeper.}}
> ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs
> are not monitored (temporarily).}}
> ** {{Connection to ZooKeeper suspended. The contender
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in
> the leader election}}{{ }}
> ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader
> from ZooKeeper.}}
> * 15:19:57 JM 2 gives up leadership
> ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked
> leadership.}}
> * 15:19:57 JM 2 changes job
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
> ** {{Stopping checkpoint coordinator for job
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
> * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages
> because there is no leader
> ** {{Discard message
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
> TaskManager akka://flink/user/taskmanager is disassociating)) because there
> is currently no valid leader id known.}}
> * 15:19:57 JM 2 connects to ZK 2 and renews its session
> ** {{Opening socket connection to server
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
> ** {{Socket connection established to
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
> ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be
> restarted.}}
> ** {{Session establishment complete on server
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid =
> 0x30000003f4a0003, negotiated timeout = 40000}}
> ** {{Connection to ZooKeeper was reconnected. Leader election can be
> restarted.}}
> ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs
> are monitored again.}}
> ** {{State change: RECONNECTED}}
> * 15:19:57: JM 1 reports JM 1 has been granted leadership:
> ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted
> leadership with leader session ID
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
> * 15:19:57 JM 2 reports the job has been suspended
> ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter
> Shutting down.}}
> ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
> * 15:19:57 JM 2 reports it has lost leadership:
> ** {{Associated JobManager
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
> ** {{Received leader address but not running in leader ActorSystem.
> Cancelling registration.}}
> * 15:19:57 TMs register with JM 1
> * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
> ** {{Attempting to recover all jobs.}}
> ** {{There are 2 jobs to recover. Starting the job recovery.}}
> ** {{Attempting to recover job
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
> ** {{Attempting to recover job
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
> * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
> ** {{Got user-level KeeperException when processing
> sessionid:0x2000001d2330001 type:create cxid:0x4211 zxid:0x60009dc70
> txntype:-1 reqpath:n/a Error
> Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
> Error:KeeperErrorCode = NodeExists for
> /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
> ** {{Got user-level KeeperException when processing
> sessionid:0x2000001d2330001 type:create cxid:0x4230 zxid:0x60009dc78
> txntype:-1 reqpath:n/a Error
> Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2
> Error:KeeperErrorCode = NodeExists for
> /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}}
> * 15:29:08 JM 1 starts to recover job
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
> ** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1,
> JobInfo(clients:
> Set((Actor[akka.tcp://[email protected]:37564/temp/$c],DETACHED)),
> start: 1528833686265)).}}
> ** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}}
> ** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}}
> ** {{Using restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}}
> ** {{Successfully ran initialization on master in 0 ms.}}
> ** {{Job recovers via failover strategy: full graph restart}}
> ** {{Running initialization on master for job Some Job
> (2a4eff355aef849c5ca37dbac04f2ff1).}}
> ** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}}
> ** {{Using application-defined state backend for checkpoint/savepoint
> metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
> ** {{Persisting periodic checkpoints externally at
> s3://bucket/flink/cluster_1/checkpoints-external.}}
> ** {{Recovering checkpoints from ZooKeeper.}}
> ** {{Found 3 checkpoints in ZooKeeper.}}
> ** {{Trying to retrieve checkpoint 69255.}}
> ** {{Trying to fetch 3 checkpoints from storage.}}
> ** {{Trying to retrieve checkpoint 69256.}}
> ** {{Trying to retrieve checkpoint 69257.}}
> ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @
> 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
> ** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}}
> ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
> CREATED to RUNNING.}}
> ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
> RUNNING to FAILING.}}{{
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the operator
> parallelism or increase the number of slots per TaskManager in the
> configuration.}}
> * 15:20:09 JM 1 starts to recover
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}
> ** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05,
> JobInfo(clients:
> Set((Actor[akka.tcp://[email protected]:41001/temp/$c],DETACHED)),
> start: 1525728377900)).}}
> ** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}}
> ** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}}
> ** {{Using restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}}
> ** {{Job recovers via failover strategy: full graph restart}}
> ** {{Successfully ran initialization on master in 0 ms.}}
> ** {{Running initialization on master for job Some Job
> (61bca496065cd05e4263070a5e923a05).}}
> ** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}}
> ** {{Using application-defined state backend for checkpoint/savepoint
> metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
> ** {{Persisting periodic checkpoints externally at
> s3://bucket/flink/cluster_1/checkpoints-external.}}
> ** {{Recovering checkpoints from ZooKeeper.}}
> ** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}}
> ** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state
> CREATED to RUNNING.}}
> ** {{Trying to fetch 0 checkpoints from storage}}
> ** {{Found 0 checkpoints in ZooKeeper.}}
> * 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
> ** {{Name collision: Group already contains a Metric with the name
> 'lastCheckpointSize'. Metric will not be reported.[jobmanager, job]}}
> ** {{Name collision: Group already contains a Metric with the name
> 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[jobmanager,
> job]}}
> ** etc
> * 15:20:39 JM 1 begins attempting to restart the
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job repeatedly
> ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
> FAILING to RESTARTING.}}
> ** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
> ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
> RESTARTING to CREATED.}}
> ** {{Recovering checkpoints from ZooKeeper.}}
> ** {{Found 3 checkpoints in ZooKeeper.}}
> ** {{Trying to fetch 3 checkpoints from storage.}}
> ** {{Trying to retrieve checkpoint 69255.}}
> ** {{Trying to retrieve checkpoint 69256.}}
> ** {{Trying to retrieve checkpoint 69257.}}
> ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @
> 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
> ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
> CREATED to RUNNING.}}
> ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state
> RUNNING to FAILING.}}
> * 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a
> follower
>
> h4. Analysis
>
> The cluster was running job
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. The JM HA leader was
> JM 2, which was connected to ZK 1. ZK 1 was a follower in the ZK ensemble.
> The ZK leader was ZK 2.
> ZK 1 lost network connectivity for about 16 minutes. Upon loss of
> connectivity to ZK 1, JM 2 gives up leadership and shutdown the
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job. JM 2 then
> immediately connects to ZK 2, without its ZK session having expired.
> Nonetheless, as it gave up leadership JM 1 is elected the new JM leader.
> JM 1 then erroneously decides there are two jobs to restore. The previously
> running job, {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}. JM 1 decides there
> are no checkpoints for
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}, which starts right
> away. JM 1 attempts to restore
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} from the latest
> checkpoint, but it fails because there aren't enough task slots in the
> cluster as a result of the other job starting. Thus,
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} entered a restart loop.
> We manually stopped both jobs and starts a new job based on the last known
> checkpoint for {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
>
> Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color} is an old job
> that we ran for a month between May 7th and June 5th.
> After our manual intervention, the the HA state directory in S3 looks like
> this:
> {{s3cmd ls s3://bucket/flink/cluster_1/recovery/}}
> {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> {{2018-07-31 17:33 35553
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}}
> {{2018-07-31 17:34 35553
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}}
> {{2018-07-31 17:32 35553
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}}
> {{2018-06-12 20:01 284626
> s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}}
> {{2018-07-30 23:01 285257
> s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}}
> submittedJobGraph7f627a661cec appears to be job
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, the long running job
> that failed during the ZK failover
> submittedJobGraphf3767780c00c appears to be job
> {color:#205081}d77948df92813a68ea6dfd6783f40e7e{color}, the job we started
> restoring from a checkpoint after shutting down the duplicate jobs
>
> A few questions come to mind.
> h5. Why does the JM terminate running jobs when it can immediately connect to
> another ZK node and its ZK session has not expired?
> This seems to be a result of using the LeaderLatch recipe in Curator. It's
> [docs|https://github.com/Netflix/curator/wiki/Leader-Latch] state:
> {quote}LeaderLatch instances add a ConnectionStateListener to watch for
> connection problems. If SUSPENDED or LOST is reported, the LeaderLatch that
> is the *leader will report that it is no longer the leader* (i.e. there will
> not be a leader until the connection is re-established). If a LOST connection
> is RECONNECTED, the LeaderLatch *will delete its previous ZNode and create a
> new one*.
> Users of LeaderLatch must take account that connection issues can cause
> leadership to be lost. i.e. hasLeadership() returns true but some time later
> the connection is SUSPENDED or LOST. At that point hasLeadership() will
> return false. It is highly recommended that LeaderLatch users register a
> ConnectionStateListener.
> {quote}
> So not only is leadership lost while disconnected, but will likely stay lost
> when reconnecting as a result of the znode deletion and recreation.
> This can probably be solved by using the Curator LeaderElection recipe
> instead, which
> [states|https://github.com/Netflix/curator/wiki/Leader-Election]:
> {quote}The {{LeaderSelectorListener}} class extends
> {{ConnectionStateListener}}. When the LeaderSelector is started, it adds the
> listener to the Curator instance. Users of the {{LeaderSelector}} must pay
> attention to any connection state changes. If an instance becomes the leader,
> it should respond to notification of being SUSPENDED or LOST.
> If the SUSPENDED state is reported, the instance must assume that it might no
> longer be the leader until it receives a RECONNECTED state. If the LOST state
> is reported, the instance is no longer the leader and its {{takeLeadership}}
> method should exit.
> {quote}
> So with LeaderElection it seems that what to do during the SUSPENDED state is
> left up to the application, which may choose to continue being leader until
> the state becomes LOST.
> Obviously there are dangers with doing so, but at the very least you would
> expect the JM not to give up leadership until it tried to connect to other ZK
> members within the remaining ZK session timeout.
> This problem has been [previously
> discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html]
> in the mailing list, which led to FLINK-6174 and this
> [PR|https://github.com/apache/flink/pull/3599], which appears to be a
> modification of the Curator LeaderLatch recipe. It also lead to FLINK-5703,
> which seems like an attempt to keep jobs running during JM failover. While
> that is a valuable addition, I argue that is not required to avoid this
> failure, as the previous leader can continue being leader so long as it
> connects to a new ZK before its ZK session expires.
>
> h5. Why did JM 1 resurrect the old job?
> Something must have been off with the state stored in the S3 HA recovery
> directory. The job must have not been fully removed.
> In fact, right now the recovery directory has the file
> submittedJobGraph7f627a661cec which appears to be job
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected?
> That job is no longer running. Shouldn't that file no longer exist in the
> recovery directory?
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)