XComp commented on a change in pull request #18536:
URL: https://github.com/apache/flink/pull/18536#discussion_r796733983



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -169,28 +177,30 @@ public void 
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
         // This will clear internal state of election service, so a new 
contender can register.
         leaderElectionService.stop();
 
+        assertThat(
+                "The JobGraph is still stored in the JobGraphStore.",
+                haServices.getJobGraphStore().getJobIds(),
+                CoreMatchers.is(Collections.singleton(jobId)));
+        assertThat(
+                "The JobResultStore has this job marked as dirty.",
+                haServices.getJobResultStore().getDirtyResults().stream()
+                        .map(JobResult::getJobId)
+                        .collect(Collectors.toSet()),
+                CoreMatchers.is(Collections.singleton(jobId)));
+
         // Run a second dispatcher, that restores our finished job.
         final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
         toTerminate.add(secondDispatcher);
-        final DispatcherGateway secondDispatcherGateway =
-                secondDispatcher.getSelfGateway(DispatcherGateway.class);
+
+        // new Dispatcher becomes new leader
         leaderElectionService.isLeader(UUID.randomUUID());
 
-        // Now make sure that restored job started from checkpoint.
-        final JobMasterGateway secondJobMasterGateway =
-                connectToLeadingJobMaster(leaderElectionService).get();
-        try (final JobMasterTester tester =
-                new JobMasterTester(rpcService, jobId, 
secondJobMasterGateway)) {
-            final CompletableFuture<List<TaskDeploymentDescriptor>> 
descriptorsFuture =
-                    tester.deployVertices(2);
-            awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
-            final Optional<JobManagerTaskRestore> maybeRestore =
-                    descriptorsFuture.get().stream()
-                            .map(TaskDeploymentDescriptor::getTaskRestore)
-                            .filter(Objects::nonNull)
-                            .findAny();
-            assertTrue("Job has recovered from checkpoint.", 
maybeRestore.isPresent());
-        }
+        assertThrows(

Review comment:
       That would result in a timeout as well, wouldn't it? I guess that would 
be possible as well. But I didn't touch this code. The test is going to be 
modified once more in the upstream PR, anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to