[
https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417272#comment-16417272
]
ASF GitHub Bot commented on FLINK-8943:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5746#discussion_r177732527
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
---
@@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception {
assertThat(jobIds, contains(jobGraph.getJobID()));
}
+ /**
+ * Tests that the {@link Dispatcher} terminates if it cannot recover
jobs ids from
+ * the {@link SubmittedJobGraphStore}. See FLINK-8943.
+ */
+ @Test
+ public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+ final FlinkException testException = new FlinkException("Test
exception");
+ submittedJobGraphStore.setJobIdsFunction(
+ (Collection<JobID> jobIds) -> {
+ throw testException;
+ });
+
+ UUID expectedLeaderSessionId = UUID.randomUUID();
+
+
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
+
+
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
+
+ UUID actualLeaderSessionId =
dispatcherLeaderElectionService.getConfirmationFuture()
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+ // we expect that a fatal error occurred
+ final Throwable error =
fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ assertThat(ExceptionUtils.findThrowableWithMessage(error,
testException.getMessage()).isPresent(), is(true));
+
+ fatalErrorHandler.clearError();
+ }
+
+ /**
+ * Tests that the {@link Dispatcher} terminates if it cannot recover
jobs from
+ * the {@link SubmittedJobGraphStore}. See FLINK-8943.
+ */
+ @Test
+ public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
+ final FlinkException testException = new FlinkException("Test
exception");
+
+ final SubmittedJobGraph submittedJobGraph = new
SubmittedJobGraph(jobGraph, null);
+ submittedJobGraphStore.putJobGraph(submittedJobGraph);
+
+ submittedJobGraphStore.setRecoverJobGraphFunction(
+ (JobID jobId, Map<JobID, SubmittedJobGraph>
submittedJobs) -> {
+ throw testException;
+ });
+
+ UUID expectedLeaderSessionId = UUID.randomUUID();
--- End diff --
I think I fixed it in b83b280ce2fb493eb647ffa589613c0b0362f39a which is
part of #5774
> Jobs will not recover if DFS is temporarily unavailable
> -------------------------------------------------------
>
> Key: FLINK-8943
> URL: https://issues.apache.org/jira/browse/FLINK-8943
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.5.0, 1.4.2, 1.6.0
> Reporter: Gary Yao
> Assignee: Till Rohrmann
> Priority: Blocker
> Labels: flip6
> Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is
> unavailable at the recovery attempt, the jobs will simply be not running
> until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt
> the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Could not
> recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph
> from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates
> that the retrieved state handle is broken. Try cleaning the state handle
> store.
> at
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54
> to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection
> exception: java.net.ConnectException: Connection refused; For more details
> see: http://wiki.apache.org/hadoop/ConnectionRefused
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
> at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
> at org.apache.hadoop.ipc.Client.call(Client.java:1435)
> at org.apache.hadoop.ipc.Client.call(Client.java:1345)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
> at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
> at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
> at
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
> at
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
> at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
> at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
> at
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
> at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
> at
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
> ... 7 more
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> at
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
> at
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
> at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
> at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
> at org.apache.hadoop.ipc.Client.call(Client.java:1381)
> ... 40 more
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)