[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564569#comment-16564569 ]
Elias Levy edited comment on FLINK-10011 at 8/1/18 12:48 AM: ------------------------------------------------------------- It appears that it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch}} and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED}} state, so that a disconnected client won't continue to think it is leader past its session expiration. was (Author: elevy): It appears that it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch}}}} and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED}} state, so that a disconnected client won't continue to think it is leader past its session expiration. > Old job resurrected during HA failover > -------------------------------------- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 1.4.2 > Reporter: Elias Levy > Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x30000003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x30000003f4a0003, negotiated timeout = 40000}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. > Cancelling registration.}} > * 15:19:57 TMs register with JM 1 > * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs: > ** {{Attempting to recover all jobs.}} > ** {{There are 2 jobs to recover. Starting the job recovery.}} > ** {{Attempting to recover job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}} > ** {{Attempting to recover job > {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}} > * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form: > ** {{Got user-level KeeperException when processing > sessionid:0x2000001d2330001 type:create cxid:0x4211 zxid:0x60009dc70 > txntype:-1 reqpath:n/a Error > Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 > Error:KeeperErrorCode = NodeExists for > /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}} > ** {{Got user-level KeeperException when processing > sessionid:0x2000001d2330001 type:create cxid:0x4230 zxid:0x60009dc78 > txntype:-1 reqpath:n/a Error > Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2 > Error:KeeperErrorCode = NodeExists for > /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}} > * 15:29:08 JM 1 starts to recover job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} > ** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1, > JobInfo(clients: > Set((Actor[akka.tcp://flink@ip-10-210-42-62.ec2.internal:37564/temp/$c],DETACHED)), > start: 1528833686265)).}} > ** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}} > ** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}} > ** {{Using restart strategy > FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, > delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}} > ** {{Successfully ran initialization on master in 0 ms.}} > ** {{Job recovers via failover strategy: full graph restart}} > ** {{Running initialization on master for job Some Job > (2a4eff355aef849c5ca37dbac04f2ff1).}} > ** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}} > ** {{Using application-defined state backend for checkpoint/savepoint > metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}} > ** {{Persisting periodic checkpoints externally at > s3://bucket/flink/cluster_1/checkpoints-external.}} > ** {{Recovering checkpoints from ZooKeeper.}} > ** {{Found 3 checkpoints in ZooKeeper.}} > ** {{Trying to retrieve checkpoint 69255.}} > ** {{Trying to fetch 3 checkpoints from storage.}} > ** {{Trying to retrieve checkpoint 69256.}} > ** {{Trying to retrieve checkpoint 69257.}} > ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ > 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}} > ** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}} > ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state > CREATED to RUNNING.}} > ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state > RUNNING to FAILING.}}{{ > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration.}} > * 15:20:09 JM 1 starts to recover > {color:#d04437}61bca496065cd05e4263070a5e923a05{color} > ** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05, > JobInfo(clients: > Set((Actor[akka.tcp://flink@ip-10-210-22-167.ec2.internal:41001/temp/$c],DETACHED)), > start: 1525728377900)).}} > ** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}} > ** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}} > ** {{Using restart strategy > FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, > delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}} > ** {{Job recovers via failover strategy: full graph restart}} > ** {{Successfully ran initialization on master in 0 ms.}} > ** {{Running initialization on master for job Some Job > (61bca496065cd05e4263070a5e923a05).}} > ** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}} > ** {{Using application-defined state backend for checkpoint/savepoint > metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}} > ** {{Persisting periodic checkpoints externally at > s3://bucket/flink/cluster_1/checkpoints-external.}} > ** {{Recovering checkpoints from ZooKeeper.}} > ** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}} > ** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state > CREATED to RUNNING.}} > ** {{Trying to fetch 0 checkpoints from storage}} > ** {{Found 0 checkpoints in ZooKeeper.}} > * 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs: > ** {{Name collision: Group already contains a Metric with the name > 'lastCheckpointSize'. Metric will not be reported.[jobmanager, job]}} > ** {{Name collision: Group already contains a Metric with the name > 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[jobmanager, > job]}} > ** etc > * 15:20:39 JM 1 begins attempting to restart the > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job repeatedly > ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state > FAILING to RESTARTING.}} > ** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}} > ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state > RESTARTING to CREATED.}} > ** {{Recovering checkpoints from ZooKeeper.}} > ** {{Found 3 checkpoints in ZooKeeper.}} > ** {{Trying to fetch 3 checkpoints from storage.}} > ** {{Trying to retrieve checkpoint 69255.}} > ** {{Trying to retrieve checkpoint 69256.}} > ** {{Trying to retrieve checkpoint 69257.}} > ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ > 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}} > ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state > CREATED to RUNNING.}} > ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state > RUNNING to FAILING.}} > * 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a > follower > > h4. Analysis > > The cluster was running job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. The JM HA leader was > JM 2, which was connected to ZK 1. ZK 1 was a follower in the ZK ensemble. > The ZK leader was ZK 2. > ZK 1 lost network connectivity for about 16 minutes. Upon loss of > connectivity to ZK 1, JM 2 gives up leadership and shutdown the > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job. JM 2 then > immediately connects to ZK 2, without its ZK session having expired. > Nonetheless, as it gave up leadership JM 1 is elected the new JM leader. > JM 1 then erroneously decides there are two jobs to restore. The previously > running job, {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and > {color:#d04437}61bca496065cd05e4263070a5e923a05{color}. JM 1 decides there > are no checkpoints for > {color:#d04437}61bca496065cd05e4263070a5e923a05{color}, which starts right > away. JM 1 attempts to restore > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} from the latest > checkpoint, but it fails because there aren't enough task slots in the > cluster as a result of the other job starting. Thus, > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} entered a restart loop. > We manually stopped both jobs and starts a new job based on the last known > checkpoint for {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. > > Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color} is an old job > that we ran for a month between May 7th and June 5th. > After our manual intervention, the the HA state directory in S3 looks like > this: > {{s3cmd ls s3://bucket/flink/cluster_1/recovery/}} > {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}} > {{2018-07-31 17:33 35553 > s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}} > {{2018-07-31 17:34 35553 > s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}} > {{2018-07-31 17:32 35553 > s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}} > {{2018-06-12 20:01 284626 > s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}} > {{2018-07-30 23:01 285257 > s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}} > submittedJobGraph7f627a661cec appears to be job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, the long running job > that failed during the ZK failover > submittedJobGraphf3767780c00c appears to be job > {color:#205081}d77948df92813a68ea6dfd6783f40e7e{color}, the job we started > restoring from a checkpoint after shutting down the duplicate jobs > > A few questions come to mind. > h5. Why does the JM terminate running jobs when it can immediately connect to > another ZK node and its ZK session has not expired? > This seems to be a result of using the LeaderLatch recipe in Curator. It's > [docs|https://github.com/Netflix/curator/wiki/Leader-Latch] state: > {quote}LeaderLatch instances add a ConnectionStateListener to watch for > connection problems. If SUSPENDED or LOST is reported, the LeaderLatch that > is the *leader will report that it is no longer the leader* (i.e. there will > not be a leader until the connection is re-established). If a LOST connection > is RECONNECTED, the LeaderLatch *will delete its previous ZNode and create a > new one*. > Users of LeaderLatch must take account that connection issues can cause > leadership to be lost. i.e. hasLeadership() returns true but some time later > the connection is SUSPENDED or LOST. At that point hasLeadership() will > return false. It is highly recommended that LeaderLatch users register a > ConnectionStateListener. > {quote} > So not only is leadership lost while disconnected, but will likely stay lost > when reconnecting as a result of the znode deletion and recreation. > This can probably be solved by using the Curator LeaderElection recipe > instead, which > [states|https://github.com/Netflix/curator/wiki/Leader-Election]: > {quote}The {{LeaderSelectorListener}} class extends > {{ConnectionStateListener}}. When the LeaderSelector is started, it adds the > listener to the Curator instance. Users of the {{LeaderSelector}} must pay > attention to any connection state changes. If an instance becomes the leader, > it should respond to notification of being SUSPENDED or LOST. > If the SUSPENDED state is reported, the instance must assume that it might no > longer be the leader until it receives a RECONNECTED state. If the LOST state > is reported, the instance is no longer the leader and its {{takeLeadership}} > method should exit. > {quote} > So with LeaderElection it seems that what to do during the SUSPENDED state is > left up to the application, which may choose to continue being leader until > the state becomes LOST. > Obviously there are dangers with doing so, but at the very least you would > expect the JM not to give up leadership until it tried to connect to other ZK > members within the remaining ZK session timeout. > This problem has been [previously > discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html] > in the mailing list, which led to FLINK-6174 and this > [PR|https://github.com/apache/flink/pull/3599], which appears to be a > modification of the Curator LeaderLatch recipe. It also lead to FLINK-5703, > which seems like an attempt to keep jobs running during JM failover. While > that is a valuable addition, I argue that is not required to avoid this > failure, as the previous leader can continue being leader so long as it > connects to a new ZK before its ZK session expires. > > h5. Why did JM 1 resurrect the old job? > Something must have been off with the state stored in the S3 HA recovery > directory. The job must have not been fully removed. > In fact, right now the recovery directory has the file > submittedJobGraph7f627a661cec which appears to be job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected? > That job is no longer running. Shouldn't that file no longer exist in the > recovery directory? > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)