Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5746#discussion_r177724449 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception { assertThat(jobIds, contains(jobGraph.getJobID())); } + /** + * Tests that the {@link Dispatcher} terminates if it cannot recover jobs ids from + * the {@link SubmittedJobGraphStore}. See FLINK-8943. + */ + @Test + public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + submittedJobGraphStore.setJobIdsFunction( + (Collection<JobID> jobIds) -> { + throw testException; + }); + + UUID expectedLeaderSessionId = UUID.randomUUID(); + + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); + + dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); + + UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + + // we expect that a fatal error occurred + final Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + + assertThat(ExceptionUtils.findThrowableWithMessage(error, testException.getMessage()).isPresent(), is(true)); + + fatalErrorHandler.clearError(); + } + + /** + * Tests that the {@link Dispatcher} terminates if it cannot recover jobs from + * the {@link SubmittedJobGraphStore}. See FLINK-8943. + */ + @Test + public void testFatalErrorAfterJobRecoveryFailure() throws Exception { + final FlinkException testException = new FlinkException("Test exception"); + + final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph, null); + submittedJobGraphStore.putJobGraph(submittedJobGraph); + + submittedJobGraphStore.setRecoverJobGraphFunction( + (JobID jobId, Map<JobID, SubmittedJobGraph> submittedJobs) -> { + throw testException; + }); + + UUID expectedLeaderSessionId = UUID.randomUUID(); --- End diff -- Code looks duplicated from here on (`testFatalErrorAfterJobIdRecoveryFailure`)
---