Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5746#discussion_r177732527
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
---
@@ -344,6 +351,72 @@ public void testJobRecovery() throws Exception {
assertThat(jobIds, contains(jobGraph.getJobID()));
}
+ /**
+ * Tests that the {@link Dispatcher} terminates if it cannot recover
jobs ids from
+ * the {@link SubmittedJobGraphStore}. See FLINK-8943.
+ */
+ @Test
+ public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
+ final FlinkException testException = new FlinkException("Test
exception");
+ submittedJobGraphStore.setJobIdsFunction(
+ (Collection<JobID> jobIds) -> {
+ throw testException;
+ });
+
+ UUID expectedLeaderSessionId = UUID.randomUUID();
+
+
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
+
+
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
+
+ UUID actualLeaderSessionId =
dispatcherLeaderElectionService.getConfirmationFuture()
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
+
+ // we expect that a fatal error occurred
+ final Throwable error =
fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ assertThat(ExceptionUtils.findThrowableWithMessage(error,
testException.getMessage()).isPresent(), is(true));
+
+ fatalErrorHandler.clearError();
+ }
+
+ /**
+ * Tests that the {@link Dispatcher} terminates if it cannot recover
jobs from
+ * the {@link SubmittedJobGraphStore}. See FLINK-8943.
+ */
+ @Test
+ public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
+ final FlinkException testException = new FlinkException("Test
exception");
+
+ final SubmittedJobGraph submittedJobGraph = new
SubmittedJobGraph(jobGraph, null);
+ submittedJobGraphStore.putJobGraph(submittedJobGraph);
+
+ submittedJobGraphStore.setRecoverJobGraphFunction(
+ (JobID jobId, Map<JobID, SubmittedJobGraph>
submittedJobs) -> {
+ throw testException;
+ });
+
+ UUID expectedLeaderSessionId = UUID.randomUUID();
--- End diff --
I think I fixed it in b83b280ce2fb493eb647ffa589613c0b0362f39a which is
part of #5774
---