tillrohrmann commented on a change in pull request #7889: [FLINK-11665] Wait 
for job termination before recovery in Dispatcher
URL: https://github.com/apache/flink/pull/7889#discussion_r262443043
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 ##########
 @@ -324,6 +331,129 @@ public void testStandbyDispatcherJobRecovery() throws 
Exception {
                }
        }
 
+       /**
+        * Tests that job store release happens before next local leader 
recovers job from the store.
+        *
+        * <p>https://issues.apache.org/jira/browse/FLINK-11665.
+        */
+       @Test
+       public void testFinishedJobRemoveFromStoreAfterLeadershipChange() 
throws Exception {
+               try (final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+                       final CuratorFramework curatorFramework = 
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+                       final WaitingSubmittedJobGraphStore 
submittedJobGraphStore = WaitingSubmittedJobGraphStore.wrap(
+                               
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration));
+                       
haServices.setSubmittedJobGraphStore(submittedJobGraphStore);
+                       final TestingLeaderElectionService 
leaderElectionService = new TestingLeaderElectionService();
+                       
haServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+                       final CompletableFuture<JobGraph> jobGraphFuture = new 
CompletableFuture<>();
+                       final CompletableFuture<ArchivedExecutionGraph> 
resultFuture = new CompletableFuture<>();
+                       final TestingDispatcher dispatcher = createDispatcher(
+                               haServices,
+                               new 
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture, 
CompletableFuture.completedFuture(null)));
+
+                       try {
+                               dispatcher.start();
+
+                               
leaderElectionService.isLeader(UUID.randomUUID()).get();
+                               final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+                               final JobGraph jobGraph = 
DispatcherHATest.createNonEmptyJobGraph();
+                               dispatcherGateway.submitJob(jobGraph, 
TIMEOUT).get();
+
+                               leaderElectionService.notLeader();
+                               CompletableFuture<UUID> newLeader = 
leaderElectionService.isLeader(UUID.randomUUID());
+                               submittedJobGraphStore.awaitRecover();
+                               submittedJobGraphStore.triggerRelease();
+                               newLeader.get();
+                               
dispatcher.getRecoverOperationFuture(TIMEOUT).get();
+
+                               resultFuture.complete(new 
ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+                               
dispatcher.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+                               
assertTrue(submittedJobGraphStore.isJobRemoved());
+                               
assertFalse(submittedJobGraphStore.getJobIds().contains(jobGraph.getJobID()));
+                       } finally {
+                               RpcUtils.terminateRpcEndpoint(dispatcher, 
TIMEOUT);
+                       }
+               }
+       }
+
+       private static class WaitingSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
+               private static final long WAIT_RECOVER_MILLI = 1000L;
+
+               private final SubmittedJobGraphStore delegate;
+               private final OneShotLatch recoverJobGraphLatch = new 
OneShotLatch();
+               private final OneShotLatch releaseJobGraphLatch = new 
OneShotLatch();
+               private volatile boolean jobRemoved;
+
+               private WaitingSubmittedJobGraphStore(SubmittedJobGraphStore 
delegate) {
+                       this.delegate = delegate;
+               }
+
+               static WaitingSubmittedJobGraphStore 
wrap(SubmittedJobGraphStore delegate) {
+                       return new WaitingSubmittedJobGraphStore(delegate);
+               }
+
+               @Nullable
+               @Override
+               public SubmittedJobGraph recoverJobGraph(JobID jobId) throws 
Exception {
+                       SubmittedJobGraph graph = 
delegate.recoverJobGraph(jobId);
+                       recoverJobGraphLatch.trigger();
+                       return graph;
+               }
+
+               void awaitRecover() throws InterruptedException {
+                       try {
+                               recoverJobGraphLatch.await(WAIT_RECOVER_MILLI, 
TimeUnit.MILLISECONDS);
+                       } catch (TimeoutException e) {
+                               // ignore
+                       }
 
 Review comment:
   I would do the `TimeoutException` handling in the test code because 
otherwise one might think that we actually recover if you are only looking at 
the test code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to