[ 
https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417239#comment-16417239
 ] 

ASF GitHub Bot commented on FLINK-8943:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5746#discussion_r177724449
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
    @@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception {
                assertThat(jobIds, contains(jobGraph.getJobID()));
        }
     
    +   /**
    +    * Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs ids from
    +    * the {@link SubmittedJobGraphStore}. See FLINK-8943.
    +    */
    +   @Test
    +   public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
    +           final FlinkException testException = new FlinkException("Test 
exception");
    +           submittedJobGraphStore.setJobIdsFunction(
    +                   (Collection<JobID> jobIds) -> {
    +                           throw testException;
    +                   });
    +
    +           UUID expectedLeaderSessionId = UUID.randomUUID();
    +
    +           
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
    +
    +           
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
    +
    +           UUID actualLeaderSessionId = 
dispatcherLeaderElectionService.getConfirmationFuture()
    +                   .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    +
    +           assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
    +
    +           // we expect that a fatal error occurred
    +           final Throwable error = 
fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
    +
    +           assertThat(ExceptionUtils.findThrowableWithMessage(error, 
testException.getMessage()).isPresent(), is(true));
    +
    +           fatalErrorHandler.clearError();
    +   }
    +
    +   /**
    +    * Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs from
    +    * the {@link SubmittedJobGraphStore}. See FLINK-8943.
    +    */
    +   @Test
    +   public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
    +           final FlinkException testException = new FlinkException("Test 
exception");
    +
    +           final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(jobGraph, null);
    +           submittedJobGraphStore.putJobGraph(submittedJobGraph);
    +
    +           submittedJobGraphStore.setRecoverJobGraphFunction(
    +                   (JobID jobId, Map<JobID, SubmittedJobGraph> 
submittedJobs) -> {
    +                           throw testException;
    +                   });
    +
    +           UUID expectedLeaderSessionId = UUID.randomUUID();
    --- End diff --
    
    Code looks duplicated from here on 
(`testFatalErrorAfterJobIdRecoveryFailure`)


> Jobs will not recover if DFS is temporarily unavailable
> -------------------------------------------------------
>
>                 Key: FLINK-8943
>                 URL: https://issues.apache.org/jira/browse/FLINK-8943
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.4.2, 1.6.0
>            Reporter: Gary Yao
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flip6
>             Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is 
> unavailable at the recovery attempt, the jobs will simply be not running 
> until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt 
> the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Could not 
> recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph 
> from state handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates 
> that the retrieved state handle is broken. Try cleaning the state handle 
> store.
>       at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 
> to ip-172-31-32-118.eu-central-1.compute.internal:9000 failed on connection 
> exception: java.net.ConnectException: Connection refused; For more details 
> see:  http://wiki.apache.org/hadoop/ConnectionRefused
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
>       at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
>       at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1435)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1345)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
>       at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
>       at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
>       at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
>       at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
>       at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
>       at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
>       at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
>       at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
>       at 
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
>       at 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
>       ... 7 more
> Caused by: java.net.ConnectException: Connection refused
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>       at 
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
>       at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
>       at 
> org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
>       at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
>       at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1381)
>       ... 40 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to