[ 
https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-8943:
------------------------------------

    Assignee: Till Rohrmann

> Jobs will not recover if DFS is temporarily unavailable
> -------------------------------------------------------
>
>                 Key: FLINK-8943
>                 URL: https://issues.apache.org/jira/browse/FLINK-8943
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.4.2, 1.6.0
>            Reporter: Gary Yao
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flip6
>             Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is 
> unavailable at the recovery attempt, the jobs will simply be not running 
> until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt 
> the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Could not 
> recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph 
> from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>       at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 
> to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection 
> exception: java.net.ConnectException: Connection refused; For more details 
> see:  http://wiki.apache.org/hadoop/ConnectionRefused
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
>       at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>       at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>       at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>       at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
>       at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
>       at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
>       at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
>       at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
>       at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
>       at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
>       at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
>       ... 7 more
> Caused by: java.net.ConnectException: Connection refused
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>       at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
>       at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
>       at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1381)
>       ... 40 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to