Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5746#discussion_r177724449
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
    @@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception {
                assertThat(jobIds, contains(jobGraph.getJobID()));
        }
     
    +   /**
    +    * Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs ids from
    +    * the {@link SubmittedJobGraphStore}. See FLINK-8943.
    +    */
    +   @Test
    +   public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
    +           final FlinkException testException = new FlinkException("Test 
exception");
    +           submittedJobGraphStore.setJobIdsFunction(
    +                   (Collection<JobID> jobIds) -> {
    +                           throw testException;
    +                   });
    +
    +           UUID expectedLeaderSessionId = UUID.randomUUID();
    +
    +           
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
    +
    +           
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
    +
    +           UUID actualLeaderSessionId = 
dispatcherLeaderElectionService.getConfirmationFuture()
    +                   .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    +
    +           assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
    +
    +           // we expect that a fatal error occurred
    +           final Throwable error = 
fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
    +
    +           assertThat(ExceptionUtils.findThrowableWithMessage(error, 
testException.getMessage()).isPresent(), is(true));
    +
    +           fatalErrorHandler.clearError();
    +   }
    +
    +   /**
    +    * Tests that the {@link Dispatcher} terminates if it cannot recover 
jobs from
    +    * the {@link SubmittedJobGraphStore}. See FLINK-8943.
    +    */
    +   @Test
    +   public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
    +           final FlinkException testException = new FlinkException("Test 
exception");
    +
    +           final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(jobGraph, null);
    +           submittedJobGraphStore.putJobGraph(submittedJobGraph);
    +
    +           submittedJobGraphStore.setRecoverJobGraphFunction(
    +                   (JobID jobId, Map<JobID, SubmittedJobGraph> 
submittedJobs) -> {
    +                           throw testException;
    +                   });
    +
    +           UUID expectedLeaderSessionId = UUID.randomUUID();
    --- End diff --
    
    Code looks duplicated from here on 
(`testFatalErrorAfterJobIdRecoveryFailure`)


---

Reply via email to