[ 
https://issues.apache.org/jira/browse/FLINK-34672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828221#comment-17828221
 ] 

Matthias Pohl edited comment on FLINK-34672 at 3/19/24 9:28 AM:
----------------------------------------------------------------

The two locks need to be nested because we need to check both: whether 
leadership is granted and whether the instance is still running. But we should 
always treat the leadership lock as the higher priority lock (i.e. acquiring it 
first) to ensure ordering within the locks.

The {{LeaderElection}}/{{LeaderElectionService}} interface is flawed in this 
sense. We shouldn't expose the {{hasLeadership}} but have a 
{{runLeaderOperation}} method that abstracts away the leadership check. In 
{{DefaultLeaderElectionService}} all these operations would be handled by the 
leader-operation executor. That's how we can ensure that the leadership lock is 
acquired first.

This is only an issue in the {{JobMasterServiceLeadershipRunner}} because we're 
executing the creation asynchronously in an io thread. The other place where we 
check within the contender whether leadership is acquired is the 
{{DefaultDispatcherRunner}}. But we're not doing any async calls there during 
leadership handling (the {{DefaultDispatcherRunner}} is created directly in the 
leader-operation executor while handling the leadership acquired event).

I also verified that this is not something that was introduced in Flink 1.18 
with the 
[FLIP-285|https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box]
 changes. AFAIS, it can also happen in 1.17- (I didn't check the 
pre-FLINK-24038 code but only looked into {{release-1.17}}).



was (Author: mapohl):
The two locks shouldn't be nested because any leadership-related operation 
should run sequentially in the leadership operation executor.

I also verified that this is not something that was introduced in Flink 1.18 
with the 
[FLIP-285|https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box]
 changes. AFAIS, it can also happen in 1.17- (I didn't check the 
pre-FLINK-24038 code but only looked into {{release-1.17}}).

One solution would be to move the async callback of 
[JobMasterServiceLeadershipRunner#forwardIfValidLeader|https://github.com/apache/flink/blob/c9fcb0c74b1354f4f0f1b7c7f62191b8cc6b5725/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L547]
 into the leader-operation executor of the {{DefaultLeaderElectionService}} to 
force sequential execution of leadership-related operations.

This is only an issue in the {{JobMasterServiceLeadershipRunner}} because we're 
executing the creation asynchronously in an io thread. The other place where we 
check within the contender whether leadership is acquired is the 
{{DefaultDispatcherRunner}}. But we're not doing any async calls there during 
leadership handling (the {{DefaultDispatcherRunner}} is created directly in the 
leader-operation executor while handling the leadership acquired event).

The {{LeaderElection}}/{{LeaderElectionService}} interface is flawed in this 
sense. We shouldn't expose the {{hasLeadership}} but have a 
{{runLeaderOperation}} method that abstracts away the leadership check. In 
{{DefaultLeaderElectionService}} all these operations would be handled by the 
leader-operation executor.

> HA deadlock between JobMasterServiceLeadershipRunner and 
> DefaultLeaderElectionService
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-34672
>                 URL: https://issues.apache.org/jira/browse/FLINK-34672
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.18.1
>            Reporter: Chesnay Schepler
>            Priority: Major
>             Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> We recently observed a deadlock in the JM within the HA system.
> (see below for the thread dump)
> [~mapohl] and I looked a bit into it and there appears to be a race condition 
> when leadership is revoked while a JobMaster is being started.
> It appears to be caused by 
> {{JobMasterServiceLeadershipRunner#createNewJobMasterServiceProcess}} 
> forwarding futures while holding a lock; depending on whether the forwarded 
> future is already complete the next stage may or may not run while holding 
> that same lock.
> We haven't determined yet whether we should be holding that lock or not.
> {code}
> "DefaultLeaderElectionService-leadershipOperationExecutor-thread-1" #131 
> daemon prio=5 os_prio=0 cpu=157.44ms elapsed=78749.65s tid=0x00007f531f43d000 
> nid=0x19d waiting for monitor entry  [0x00007f53084fd000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at 
> org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.runIfStateRunning(JobMasterServiceLeadershipRunner.java:462)
>         - waiting to lock <0x00000000f1c0e088> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.revokeLeadership(JobMasterServiceLeadershipRunner.java:397)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.notifyLeaderContenderOfLeadershipLoss(DefaultLeaderElectionService.java:484)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService$$Lambda$1252/0x0000000840ddec40.accept(Unknown
>  Source)
>         at java.util.HashMap.forEach([email protected]/HashMap.java:1337)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onRevokeLeadershipInternal(DefaultLeaderElectionService.java:452)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService$$Lambda$1251/0x0000000840dcf840.run(Unknown
>  Source)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.lambda$runInLeaderEventThread$3(DefaultLeaderElectionService.java:549)
>         - locked <0x00000000f0e3f4d8> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService$$Lambda$1075/0x0000000840c23040.run(Unknown
>  Source)
>         at 
> java.util.concurrent.CompletableFuture$AsyncRun.run([email protected]/CompletableFuture.java:1736)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
>         at java.lang.Thread.run([email protected]/Thread.java:829)
> {code}
> {code}
> "jobmanager-io-thread-1" #636 daemon prio=5 os_prio=0 cpu=125.56ms 
> elapsed=78699.01s tid=0x00007f5321c6e800 nid=0x396 waiting for monitor entry  
> [0x00007f530567d000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.hasLeadership(DefaultLeaderElectionService.java:366)
>         - waiting to lock <0x00000000f0e3f4d8> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.leaderelection.DefaultLeaderElection.hasLeadership(DefaultLeaderElection.java:52)
>         at 
> org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.isValidLeader(JobMasterServiceLeadershipRunner.java:509)
>         at 
> org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner.lambda$forwardIfValidLeader$15(JobMasterServiceLeadershipRunner.java:520)
>         - locked <0x00000000f1c0e088> (a java.lang.Object)
>         at 
> org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner$$Lambda$1320/0x0000000840e1a840.accept(Unknown
>  Source)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete([email protected]/CompletableFuture.java:859)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire([email protected]/CompletableFuture.java:837)
>         at 
> java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
>         at 
> java.util.concurrent.CompletableFuture.complete([email protected]/CompletableFuture.java:2079)
>         at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.registerJobMasterServiceFutures(DefaultJobMasterServiceProcess.java:124)
>         at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:114)
>         at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess$$Lambda$1319/0x0000000840e1a440.accept(Unknown
>  Source)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete([email protected]/CompletableFuture.java:859)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire([email protected]/CompletableFuture.java:837)
>         at 
> java.util.concurrent.CompletableFuture.postComplete([email protected]/CompletableFuture.java:506)
>         at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run([email protected]/CompletableFuture.java:1705)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1128)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:628)
>         at java.lang.Thread.run([email protected]/Thread.java:829)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to