tillrohrmann commented on a change in pull request #7889: [FLINK-11665] Wait 
for job termination before recovery in Dispatcher
URL: https://github.com/apache/flink/pull/7889#discussion_r262443984
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 ##########
 @@ -324,6 +331,129 @@ public void testStandbyDispatcherJobRecovery() throws 
Exception {
                }
        }
 
+       /**
+        * Tests that job store release happens before next local leader 
recovers job from the store.
+        *
+        * <p>https://issues.apache.org/jira/browse/FLINK-11665.
+        */
+       @Test
+       public void testFinishedJobRemoveFromStoreAfterLeadershipChange() 
throws Exception {
+               try (final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+                       final CuratorFramework curatorFramework = 
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+                       final WaitingSubmittedJobGraphStore 
submittedJobGraphStore = WaitingSubmittedJobGraphStore.wrap(
+                               
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration));
+                       
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+                       final TestingLeaderElectionService 
leaderElectionService = new TestingLeaderElectionService();
+                       
haServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+                       final CompletableFuture<JobGraph> jobGraphFuture = new 
CompletableFuture<>();
+                       final CompletableFuture<ArchivedExecutionGraph> 
resultFuture = new CompletableFuture<>();
+                       final TestingDispatcher dispatcher = createDispatcher(
+                               haServices,
+                               new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, 
CompletableFuture.completedFuture(null)));
+
+                       try {
+                               dispatcher.start();
+
+                               
leaderElectionService.isLeader(UUID.randomUUID()).get();
+                               final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+                               final JobGraph jobGraph = 
DispatcherHATest.createNonEmptyJobGraph();
+                               dispatcherGateway.submitJob(jobGraph, 
TIMEOUT).get();
+
+                               leaderElectionService.notLeader();
+                               CompletableFuture<UUID> newLeader = 
leaderElectionService.isLeader(UUID.randomUUID());
+                               submittedJobGraphStore.awaitRecover();
+                               submittedJobGraphStore.triggerRelease();
+                               newLeader.get();
+                               
dispatcher.getRecoverOperationFuture(TIMEOUT).get();
+
+                               resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+                               
dispatcher.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+                               
assertTrue(submittedJobGraphStore.isJobRemoved());
+                               
assertFalse(submittedJobGraphStore.getJobIds().contains(jobGraph.getJobID()));
 
 Review comment:
   I would suggest to use hamcrest assertions in the future because they give 
better error messages and are more expressive.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to