[
https://issues.apache.org/jira/browse/FLINK-11225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Till Rohrmann closed FLINK-11225.
---------------------------------
Resolution: Duplicate
> Error state of addedJobGraphs when Dispatcher with concurrent revoking and
> granting leadership
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-11225
> URL: https://issues.apache.org/jira/browse/FLINK-11225
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.6.2
> Environment: flink 1.6.2 on yarn
> Reporter: dongtingting
> Priority: Major
>
> Dispatcher was revoked leadership and immediately grant leadership in some
> cases like appmaster go through a long time of full gc。 This can lead to
> Dispatcher.'revokeLeadership' and 'grantLeadership' concurrently run。Then
> ZooKeeperSubmittedJobGraphStore may 'recoverJobGraph' happen before
> 'releaseJobGraph',and addedJobGraphs in ZooKeeperSubmittedJobGraphStore do
> not contain the running job。 Later when we cancle the running job, cant not
> remove jobgraph from zk $basedir/jobgraphs/$job_id. If appmaster restart it
> will recover the cancled job.
> case, appmaster log:
> 2018-12-08 21:12:03,729 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(******:2181)]
> - *Client session timed out*, have not heard from server in 40082ms for
> sessionid 0x1657682ceee6082, closing socket connection and attempting
> reconnect
> 2018-12-08 21:12:03,978 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread]
> - State change: SUSPENDED
> 2018-12-08 21:12:03,980 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0]
> - Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2018-12-08 21:12:03,980 WARN
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0]
> - Connection to ZooKeeper suspended. The contender
> [http://***:***|http://%2A%2A%2A%2A%2A%2A/] no longer participates in the
> leader election.
> 2018-12-08 21:12:03,980 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0]
> - Connection to ZooKeeper suspended. Can no longer retrieve the leader from
> ZooKeeper.
> 2018-12-08 21:12:03,981 WARN
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0]
> - Connection to ZooKeeper suspended. The contender
> akka.tcp://[email protected]:44815/user/resourcemanager no longer
> participates in the leader election.
> 2018-12-08 21:12:03,982 INFO
> org.apache.flink.runtime.jobmaster.JobManagerRunner
> [Curator-ConnectionStateManager-0] - JobManager for job job_***
> (2a16bfa299b56432e1141df3b1361fbc) was *revoked* *leadership* at
> akka.tcp://flink@****:***/user/jobmanager_0.
> 2018-12-08 21:12:03,984 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-186]
> - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2018-12-08 21:12:03,986 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint
> [Curator-ConnectionStateManager-0] -
> [http://****:***|http://%2A%2A%2A%2A%2A%2A%2A/] lost leadership
> 2018-12-08 21:12:03,986 INFO org.apache.flink.yarn.YarnResourceManager
> [flink-akka.actor.default-dispatcher-287] - ResourceManager
> akka.tcp://flink@****:***/user/resourcemanager was *revoked* *leadership*.
> Clearing fencing token.
> 2018-12-08 21:12:03,986 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-287]
> - Stopping ZooKeeperLeaderRetrievalService
> /leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock.
> 2018-12-08 21:12:03,990 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> [flink-akka.actor.default-dispatcher-281] - *Dispatcher*
> akka.tcp://[email protected]:44815/user/dispatcher was *revoked
> leadership.*
> 2018-12-08 21:12:03,990 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> [flink-akka.actor.default-dispatcher-281] - Stopping all currently running
> jobs of dispatcher akka.tcp://flink@****:***/user/dispatcher.
> 2018-12-08 21:12:04,181 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(10.54.33.12:2181)]
> - Session establishment complete on server ****/****:2181, sessionid =
> 0x1657682ceee6082, negotiated timeout = 60000
> 2018-12-08 21:12:04,181 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread]
> - State change: *RECONNECTED*
> 2018-12-08 21:12:04,188 INFO
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [main-EventThread]
> - [http://bjm7-lc453.jxq:45684|http://bjm7-lc453.jxq:45684/] *was granted
> leadership* with leaderSessionID=43f8a4f4-d3a6-48fb-afef-1f2f03ad5626
> 2018-12-08 21:12:04,188 INFO org.apache.flink.yarn.YarnResourceManager
> [flink-akka.actor.default-dispatcher-281] - ResourceManager
> akka.tcp://[email protected]:44815/user/resourcemanager was granted
> leadership with fencing token acacde8ee4e115851f872189e7064971
> 2018-12-08 21:12:04,188 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager[flink-akka.actor.default-dispatcher-281]
> - Starting the SlotManager.
> 2018-12-08 21:12:04,190 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> [flink-akka.actor.default-dispatcher-218] - Dispatcher
> akka.tcp://flink@****:***/user/dispatcher was granted leadership with fencing
> token 8fc420d6-5526-41b0-ac0f-881437c55919
> 2018-12-08 21:12:04,190 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> [flink-akka.actor.default-dispatcher-276] - *Recovering all persisted jobs*.
> 2018-12-08 21:12:04,624 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [flink-akka.actor.default-dispatcher-186] - Stopping the JobMaster for job
> job_***(2a16bfa299b56432e1141df3b1361fbc).
> 2018-12-08 21:12:04,648 INFO
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-276]
> -{color:#ff0000} Recovered
> SubmittedJobGraph(2a16bfa299b56432e1141df3b1361fbc, null){color}.
> 2018-12-08 21:12:04,665 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool
> [flink-akka.actor.default-dispatcher-242] - Stopping SlotPool.
> 2018-12-08 21:12:04,673 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[flink-akka.actor.default-dispatcher-186]
> - Stopping ZooKeeperLeaderElectionService
> ZooKeeperLeaderElectionService\{leaderPath='/leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock'}.
> 2018-12-08 21:12:08,283 INFO
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-218]
> -{color:#ff0000} Released locks of job graph
> 2a16bfa299b56432e1141df3b1361fbc{color} from ZooKeeper.
>
> here, addedJobGraphs in ZooKeeperSubmittedJobGraphStore do not contain the
> running job 2a16bfa299b56432e1141df3b1361fbc
> later the job is cancled ,but when appmaster restart, it will try to recover
> the cancled job and fail. appmaster log:
>
> 2018-12-08 22:02:30,160 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint
> [flink-akka.actor.default-dispatcher-670] - Fatal error occurred in the
> cluster entrypoint.
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> set up JobManager
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more
> Caused by: java.lang.Exception: Cannot set up the user code libraries: File
> does not exist:
> /home/flink/data/flink/state/**/***/zk/**/blob/job_2a16bfa299b56432e1141df3b1361fbc/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
> at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
> at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:134)
> ... 10 more
> Caused by: java.io.FileNotFoundException: File does not exist:
> /home/flink/data/flink/state/***/****/zk/***/blob/job_{color:#ff0000}2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
> at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
> at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> at
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> at
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1199)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1189)
> at
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:275)
> at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:242)
> at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:235)
> at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1487)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:120)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:37)
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:102)
> at
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:84)
> at
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:493)
> at
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
> at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417)
> at
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
> at
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:131)
> ... 10 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File
> does not exist:
> /home/flink/data/flink/state/**/***/zk/***/blob/{color:#ff0000}job_2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
> at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
> at
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
> at org.apache.hadoop.ipc.Client.call(Client.java:1470)
> at org.apache.hadoop.ipc.Client.call(Client.java:1401)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1209)
> ... 31 more
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)