[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564573#comment-16564573
 ] 

Elias Levy commented on FLINK-10011:
------------------------------------

[~trohrm...@apache.org] what do you think?

> Old job resurrected during HA failover
> --------------------------------------
>
>                 Key: FLINK-10011
>                 URL: https://issues.apache.org/jira/browse/FLINK-10011
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.4.2
>            Reporter: Elias Levy
>            Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x30000003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x30000003f4a0003, negotiated timeout = 40000}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when processing 
> sessionid:0x2000001d2330001 type:create cxid:0x4211 zxid:0x60009dc70 
> txntype:-1 reqpath:n/a Error 
> Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 
> Error:KeeperErrorCode = NodeExists for 
> /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
>  ** {{Got user-level KeeperException when processing 
> sessionid:0x2000001d2330001 type:create cxid:0x4230 zxid:0x60009dc78 
> txntype:-1 reqpath:n/a Error 
> Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2
>  Error:KeeperErrorCode = NodeExists for 
> /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}}
>  * 15:29:08 JM 1 starts to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
>  ** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1, 
> JobInfo(clients: 
> Set((Actor[akka.tcp://flink@ip-10-210-42-62.ec2.internal:37564/temp/$c],DETACHED)),
>  start: 1528833686265)).}}
>  ** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}}
>  ** {{Using restart strategy 
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
> delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Running initialization on master for job Some Job 
> (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint 
> metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at 
> s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 
> 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> CREATED to RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> RUNNING to FAILING.}}{{ 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration.}}
>  * 15:20:09 JM 1 starts to recover 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}
>  ** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05, 
> JobInfo(clients: 
> Set((Actor[akka.tcp://flink@ip-10-210-22-167.ec2.internal:41001/temp/$c],DETACHED)),
>  start: 1525728377900)).}}
>  ** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}}
>  ** {{Using restart strategy 
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
> delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Running initialization on master for job Some Job 
> (61bca496065cd05e4263070a5e923a05).}}
>  ** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint 
> metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at 
> s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}}
>  ** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state 
> CREATED to RUNNING.}}
>  ** {{Trying to fetch 0 checkpoints from storage}}
>  ** {{Found 0 checkpoints in ZooKeeper.}}
>  * 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
>  ** {{Name collision: Group already contains a Metric with the name 
> 'lastCheckpointSize'. Metric will not be reported.[jobmanager, job]}}
>  ** {{Name collision: Group already contains a Metric with the name 
> 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[jobmanager, 
> job]}}
>  ** etc
>  * 15:20:39 JM 1 begins attempting to restart the 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job repeatedly
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> FAILING to RESTARTING.}}
>  ** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> RESTARTING to CREATED.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 
> 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> CREATED to RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> RUNNING to FAILING.}}
>  * 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a 
> follower
>  
> h4. Analysis
>  
> The cluster was running job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.  The JM HA leader was 
> JM 2, which was connected to ZK 1.  ZK 1 was a follower in the ZK ensemble.  
> The ZK leader was ZK 2.  
> ZK 1 lost network connectivity for about 16 minutes.  Upon loss of 
> connectivity to ZK 1, JM 2 gives up leadership and shutdown the  
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job.  JM 2 then 
> immediately connects to ZK 2, without its ZK session having expired.  
> Nonetheless, as it gave up leadership JM 1 is elected the new JM leader.
> JM 1 then erroneously decides there are two jobs to restore.  The previously 
> running job,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.  JM 1 decides there 
> are no checkpoints for 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}, which starts right 
> away.   JM 1 attempts to restore 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} from the latest 
> checkpoint, but it fails because there aren't enough task slots in the 
> cluster as a result of the other job starting.  Thus,  
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} entered a restart loop.
> We manually stopped both jobs and starts a new job based on the last known 
> checkpoint for  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
>  
> Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}  is an old job 
> that we ran for a month between May 7th and June 5th.
> After our manual intervention, the the HA state directory in S3 looks like 
> this:
> {{s3cmd ls s3://bucket/flink/cluster_1/recovery/}}
> {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> {{2018-07-31 17:33 35553 
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}}
> {{2018-07-31 17:34 35553 
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}}
> {{2018-07-31 17:32 35553 
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}}
> {{2018-06-12 20:01 284626 
> s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}}
> {{2018-07-30 23:01 285257 
> s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}}
> submittedJobGraph7f627a661cec appears to be job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, the long running job 
> that failed during the ZK failover
> submittedJobGraphf3767780c00c appears to be job 
> {color:#205081}d77948df92813a68ea6dfd6783f40e7e{color}, the job we started 
> restoring from a checkpoint after shutting down the duplicate jobs
>  
> A few questions come to mind.
> h5. Why does the JM terminate running jobs when it can immediately connect to 
> another ZK node and its ZK session has not expired?
> This seems to be a result of using the LeaderLatch recipe in Curator.  It's 
> [docs|https://github.com/Netflix/curator/wiki/Leader-Latch] state: 
> {quote}LeaderLatch instances add a ConnectionStateListener to watch for 
> connection problems. If SUSPENDED or LOST is reported, the LeaderLatch that 
> is the *leader will report that it is no longer the leader* (i.e. there will 
> not be a leader until the connection is re-established). If a LOST connection 
> is RECONNECTED, the LeaderLatch *will delete its previous ZNode and create a 
> new one*.
> Users of LeaderLatch must take account that connection issues can cause 
> leadership to be lost. i.e. hasLeadership() returns true but some time later 
> the connection is SUSPENDED or LOST. At that point hasLeadership() will 
> return false. It is highly recommended that LeaderLatch users register a 
> ConnectionStateListener.
> {quote}
> So not only is leadership lost while disconnected, but will likely stay lost 
> when reconnecting as a result of the znode deletion and recreation.
> This can probably be solved by using the Curator LeaderElection recipe 
> instead, which 
> [states|https://github.com/Netflix/curator/wiki/Leader-Election]:
> {quote}The {{LeaderSelectorListener}} class extends 
> {{ConnectionStateListener}}. When the LeaderSelector is started, it adds the 
> listener to the Curator instance. Users of the {{LeaderSelector}} must pay 
> attention to any connection state changes. If an instance becomes the leader, 
> it should respond to notification of being SUSPENDED or LOST.
> If the SUSPENDED state is reported, the instance must assume that it might no 
> longer be the leader until it receives a RECONNECTED state. If the LOST state 
> is reported, the instance is no longer the leader and its {{takeLeadership}} 
> method should exit.
> {quote}
> So with LeaderElection it seems that what to do during the SUSPENDED state is 
> left up to the application, which may choose to continue being leader until 
> the state becomes LOST.
> Obviously there are dangers with doing so, but at the very least you would 
> expect the JM not to give up leadership until it tried to connect to other ZK 
> members within the remaining ZK session timeout.
> This problem has been [previously 
> discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html]
>  in the mailing list, which led to FLINK-6174 and this 
> [PR|https://github.com/apache/flink/pull/3599], which appears to be a 
> modification of the Curator LeaderLatch recipe.  It also lead to FLINK-5703, 
> which seems like an attempt to keep jobs running during JM failover.  While 
> that is a valuable addition, I argue that is not required to avoid this 
> failure, as the previous leader can continue being leader so long as it 
> connects to a new ZK before its ZK session expires.
>  
> h5. Why did JM 1 resurrect the old job?
> Something must have been off with the state stored in the S3 HA recovery 
> directory.  The job must have not been fully removed.  
> In fact, right now the recovery directory has the file 
> submittedJobGraph7f627a661cec which appears to be job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected?  
> That job is no longer running.  Shouldn't that file no longer exist in the 
> recovery directory?
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to