tillrohrmann commented on a change in pull request #7889: [FLINK-11665] Wait
for job termination before recovery in Dispatcher
URL: https://github.com/apache/flink/pull/7889#discussion_r262443043
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
##########
@@ -324,6 +331,129 @@ public void testStandbyDispatcherJobRecovery() throws
Exception {
}
}
+ /**
+ * Tests that job store release happens before next local leader
recovers job from the store.
+ *
+ * <p>https://issues.apache.org/jira/browse/FLINK-11665.
+ */
+ @Test
+ public void testFinishedJobRemoveFromStoreAfterLeadershipChange()
throws Exception {
+ try (final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+ final CuratorFramework curatorFramework =
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+ final WaitingSubmittedJobGraphStore
submittedJobGraphStore = WaitingSubmittedJobGraphStore.wrap(
+
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration));
+
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+ final TestingLeaderElectionService
leaderElectionService = new TestingLeaderElectionService();
+
haServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+ final CompletableFuture<JobGraph> jobGraphFuture = new
CompletableFuture<>();
+ final CompletableFuture<ArchivedExecutionGraph>
resultFuture = new CompletableFuture<>();
+ final TestingDispatcher dispatcher = createDispatcher(
+ haServices,
+ new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture,
CompletableFuture.completedFuture(null)));
+
+ try {
+ dispatcher.start();
+
+
leaderElectionService.isLeader(UUID.randomUUID()).get();
+ final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ final JobGraph jobGraph =
DispatcherHATest.createNonEmptyJobGraph();
+ dispatcherGateway.submitJob(jobGraph,
TIMEOUT).get();
+
+ leaderElectionService.notLeader();
+ CompletableFuture<UUID> newLeader =
leaderElectionService.isLeader(UUID.randomUUID());
+ submittedJobGraphStore.awaitRecover();
+ submittedJobGraphStore.triggerRelease();
+ newLeader.get();
+
dispatcher.getRecoverOperationFuture(TIMEOUT).get();
+
+ resultFuture.complete(new
ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
dispatcher.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+
assertTrue(submittedJobGraphStore.isJobRemoved());
+
assertFalse(submittedJobGraphStore.getJobIds().contains(jobGraph.getJobID()));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(dispatcher,
TIMEOUT);
+ }
+ }
+ }
+
+ private static class WaitingSubmittedJobGraphStore implements
SubmittedJobGraphStore {
+ private static final long WAIT_RECOVER_MILLI = 1000L;
+
+ private final SubmittedJobGraphStore delegate;
+ private final OneShotLatch recoverJobGraphLatch = new
OneShotLatch();
+ private final OneShotLatch releaseJobGraphLatch = new
OneShotLatch();
+ private volatile boolean jobRemoved;
+
+ private WaitingSubmittedJobGraphStore(SubmittedJobGraphStore
delegate) {
+ this.delegate = delegate;
+ }
+
+ static WaitingSubmittedJobGraphStore
wrap(SubmittedJobGraphStore delegate) {
+ return new WaitingSubmittedJobGraphStore(delegate);
+ }
+
+ @Nullable
+ @Override
+ public SubmittedJobGraph recoverJobGraph(JobID jobId) throws
Exception {
+ SubmittedJobGraph graph =
delegate.recoverJobGraph(jobId);
+ recoverJobGraphLatch.trigger();
+ return graph;
+ }
+
+ void awaitRecover() throws InterruptedException {
+ try {
+ recoverJobGraphLatch.await(WAIT_RECOVER_MILLI,
TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // ignore
+ }
Review comment:
I would do the `TimeoutException` handling in the test code because
otherwise one might think that we actually recover if you are only looking at
the test code.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services