tillrohrmann commented on a change in pull request #7889: [FLINK-11665] Wait
for job termination before recovery in Dispatcher
URL: https://github.com/apache/flink/pull/7889#discussion_r262442875
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
##########
@@ -324,6 +331,129 @@ public void testStandbyDispatcherJobRecovery() throws
Exception {
}
}
+ /**
+ * Tests that job store release happens before next local leader
recovers job from the store.
+ *
+ * <p>https://issues.apache.org/jira/browse/FLINK-11665.
+ */
+ @Test
+ public void testFinishedJobRemoveFromStoreAfterLeadershipChange()
throws Exception {
+ try (final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+ final CuratorFramework curatorFramework =
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+ final WaitingSubmittedJobGraphStore
submittedJobGraphStore = WaitingSubmittedJobGraphStore.wrap(
+
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration));
+
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+ final TestingLeaderElectionService
leaderElectionService = new TestingLeaderElectionService();
+
haServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+ final CompletableFuture<JobGraph> jobGraphFuture = new
CompletableFuture<>();
+ final CompletableFuture<ArchivedExecutionGraph>
resultFuture = new CompletableFuture<>();
+ final TestingDispatcher dispatcher = createDispatcher(
+ haServices,
+ new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture,
CompletableFuture.completedFuture(null)));
+
+ try {
+ dispatcher.start();
+
+
leaderElectionService.isLeader(UUID.randomUUID()).get();
+ final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ final JobGraph jobGraph =
DispatcherHATest.createNonEmptyJobGraph();
+ dispatcherGateway.submitJob(jobGraph,
TIMEOUT).get();
+
+ leaderElectionService.notLeader();
+ CompletableFuture<UUID> newLeader =
leaderElectionService.isLeader(UUID.randomUUID());
+ submittedJobGraphStore.awaitRecover();
+ submittedJobGraphStore.triggerRelease();
+ newLeader.get();
+
dispatcher.getRecoverOperationFuture(TIMEOUT).get();
+
+ resultFuture.complete(new
ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
dispatcher.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+
assertTrue(submittedJobGraphStore.isJobRemoved());
+
assertFalse(submittedJobGraphStore.getJobIds().contains(jobGraph.getJobID()));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(dispatcher,
TIMEOUT);
+ }
+ }
+ }
+
+ private static class WaitingSubmittedJobGraphStore implements
SubmittedJobGraphStore {
+ private static final long WAIT_RECOVER_MILLI = 1000L;
Review comment:
The waiting time is too long. It should be ok to set it to something like
`10-50` milli seconds
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services