tillrohrmann commented on a change in pull request #7889: [FLINK-11665] Wait
for job termination before recovery in Dispatcher
URL: https://github.com/apache/flink/pull/7889#discussion_r262443984
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
##########
@@ -324,6 +331,129 @@ public void testStandbyDispatcherJobRecovery() throws
Exception {
}
}
+ /**
+ * Tests that job store release happens before next local leader
recovers job from the store.
+ *
+ * <p>https://issues.apache.org/jira/browse/FLINK-11665.
+ */
+ @Test
+ public void testFinishedJobRemoveFromStoreAfterLeadershipChange()
throws Exception {
+ try (final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+ final CuratorFramework curatorFramework =
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+ final WaitingSubmittedJobGraphStore
submittedJobGraphStore = WaitingSubmittedJobGraphStore.wrap(
+
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration));
+
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+ final TestingLeaderElectionService
leaderElectionService = new TestingLeaderElectionService();
+
haServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+ final CompletableFuture<JobGraph> jobGraphFuture = new
CompletableFuture<>();
+ final CompletableFuture<ArchivedExecutionGraph>
resultFuture = new CompletableFuture<>();
+ final TestingDispatcher dispatcher = createDispatcher(
+ haServices,
+ new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture,
CompletableFuture.completedFuture(null)));
+
+ try {
+ dispatcher.start();
+
+
leaderElectionService.isLeader(UUID.randomUUID()).get();
+ final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ final JobGraph jobGraph =
DispatcherHATest.createNonEmptyJobGraph();
+ dispatcherGateway.submitJob(jobGraph,
TIMEOUT).get();
+
+ leaderElectionService.notLeader();
+ CompletableFuture<UUID> newLeader =
leaderElectionService.isLeader(UUID.randomUUID());
+ submittedJobGraphStore.awaitRecover();
+ submittedJobGraphStore.triggerRelease();
+ newLeader.get();
+
dispatcher.getRecoverOperationFuture(TIMEOUT).get();
+
+ resultFuture.complete(new
ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
dispatcher.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+
assertTrue(submittedJobGraphStore.isJobRemoved());
+
assertFalse(submittedJobGraphStore.getJobIds().contains(jobGraph.getJobID()));
Review comment:
I would suggest to use hamcrest assertions in the future because they give
better error messages and are more expressive.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services