dongtingting created FLINK-11225:
------------------------------------
Summary: Error state of addedJobGraphs when Dispatcher with
concurrent revoking and granting leadership
Key: FLINK-11225
URL: https://issues.apache.org/jira/browse/FLINK-11225
Project: Flink
Issue Type: Bug
Components: Core, Distributed Coordination
Affects Versions: 1.6.2
Environment: flink 1.6.2 on yarn
Reporter: dongtingting
Dispatcher was revoked leadership and immediately grant leadership in some
cases like appmaster go through a long time of full gc。 This can lead to
Dispatcher.'revokeLeadership' and 'grantLeadership' concurrently run。Then
ZooKeeperSubmittedJobGraphStore may 'recoverJobGraph' happen before
'releaseJobGraph',and addedJobGraphs in ZooKeeperSubmittedJobGraphStore do not
contain the running job。 Later when we cancle the running job, cant not remove
jobgraph from zk $basedir/jobgraphs/$job_id. If appmaster restart it will
recover the cancled job.
case log:
2018-12-08 21:12:03,729 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(******:2181)]
- Client session timed out, have not heard from server in 40082ms for
sessionid 0x1657682ceee6082, closing socket connection and attempting reconnect
2018-12-08 21:12:03,978 INFO
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread]
- State change: SUSPENDED
2018-12-08 21:12:03,980 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0]
- Connection to ZooKeeper suspended. Can no longer retrieve the leader from
ZooKeeper.
2018-12-08 21:12:03,980 WARN
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0]
- Connection to
ZooKeeper suspended. The contender http://***:*** no longer participates in
the leader election.
2018-12-08 21:12:03,980 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0]
- Connection
to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2018-12-08 21:12:03,981 WARN
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0]
- Connection to
ZooKeeper suspended. The contender
akka.tcp://[email protected]:44815/user/resourcemanager no longer
participates in the leader election.
2018-12-08 21:12:03,982 INFO
org.apache.flink.runtime.jobmaster.JobManagerRunner
[Curator-ConnectionStateManager-0] - JobManager for job job_***
(2a16bfa299b56432e1141df3b1361fbc) was revoked leadership at
akka.tcp://flink@****:***/user/jobmanager_0.
2018-12-08 21:12:03,984 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-186]
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2018-12-08 21:12:03,986 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
[Curator-ConnectionStateManager-0] - http://****:*** lost leadership
2018-12-08 21:12:03,986 INFO org.apache.flink.yarn.YarnResourceManager
[flink-akka.actor.default-dispatcher-287] - ResourceManager
akka.tcp://flink@****:***/user/resourcemanager was revoked leadership. Clearing
fencing token.
2018-12-08 21:12:03,986 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-287]
- Stopping ZooKeeperLeaderRetrievalService
/leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock.
2018-12-08 21:12:03,990 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher
[flink-akka.actor.default-dispatcher-281] - Stopping all currently running
jobs of dispatcher akka.tcp://flink@****:***/user/dispatcher.
2018-12-08 21:12:04,181 INFO
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(10.54.33.12:2181)]
- Session establishment complete on server ****/****:2181, sessionid =
0x1657682ceee6082, negotiated timeout = 60000
2018-12-08 21:12:04,181 INFO
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread]
- State change: RECONNECTED
2018-12-08 21:12:04,188 INFO
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [main-EventThread] -
http://bjm7-lc453.jxq:45684 was granted leadership with
leaderSessionID=43f8a4f4-d3a6-48fb-afef-1f2f03ad5626
2018-12-08 21:12:04,188 INFO org.apache.flink.yarn.YarnResourceManager
[flink-akka.actor.default-dispatcher-281] - ResourceManager
akka.tcp://[email protected]:44815/user/resourcemanager was granted
leadership with fencing token acacde8ee4e115851f872189e7064971
2018-12-08 21:12:04,188 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager[flink-akka.actor.default-dispatcher-281]
- Starting the SlotManager.
2018-12-08 21:12:04,190 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher
[flink-akka.actor.default-dispatcher-218] - Dispatcher
akka.tcp://flink@****:***/user/dispatcher was granted leadership with fencing
token 8fc420d6-5526-41b0-ac0f-881437c55919
2018-12-08 21:12:04,190 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher
[flink-akka.actor.default-dispatcher-276] - Recovering all persisted jobs.
2018-12-08 21:12:04,624 INFO org.apache.flink.runtime.jobmaster.JobMaster
[flink-akka.actor.default-dispatcher-186] - Stopping the JobMaster for job
job_***(2a16bfa299b56432e1141df3b1361fbc).
2018-12-08 21:12:04,648 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-276]
-{color:#FF0000} Recovered SubmittedJobGraph(2a16bfa299b56432e1141df3b1361fbc,
null){color}.
2018-12-08 21:12:04,665 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
[flink-akka.actor.default-dispatcher-242] - Stopping SlotPool.
2018-12-08 21:12:04,673 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[flink-akka.actor.default-dispatcher-186]
- Stopping ZooKeeperLeaderElectionService
ZooKeeperLeaderElectionService\{leaderPath='/leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock'}.
2018-12-08 21:12:08,283 INFO
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-218]
-{color:#FF0000} Released locks of job graph
2a16bfa299b56432e1141df3b1361fbc{color} from ZooKeeper.
later the job is cancled ,but when appmaster restart, it will try to recover
the cancled job and fail. log:
2018-12-08 22:02:30,160 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[flink-akka.actor.default-dispatcher-670] - Fatal error occurred in the cluster
entrypoint.
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set
up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.Exception: Cannot set up the user code libraries: File
does not exist:
/home/flink/data/flink/state/**/***/zk/**/blob/job_2a16bfa299b56432e1141df3b1361fbc/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
at
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:134)
... 10 more
Caused by: java.io.FileNotFoundException: File does not exist:
/home/flink/data/flink/state/***/****/zk/***/blob/job_{color:#FF0000}2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
at
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1199)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1189)
at
org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:275)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:242)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:235)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1487)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
at
org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:120)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:37)
at
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:102)
at
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:84)
at
org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:493)
at
org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
at
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:131)
... 10 more
Caused by:
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does
not exist:
/home/flink/data/flink/state/**/***/zk/***/blob/{color:#FF0000}job_2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
at
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
at org.apache.hadoop.ipc.Client.call(Client.java:1470)
at org.apache.hadoop.ipc.Client.call(Client.java:1401)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1209)
... 31 more
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)