This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d1dcb68abac2ef781a27f28edd055cfe3efa87e8 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Fri Aug 17 15:24:45 2018 +0200 [hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher loses leadership --- .../flink/runtime/dispatcher/DispatcherHATest.java | 75 ++++++++++++++++++---- 1 file changed, 64 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java index adf7618..cb26f48 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java @@ -30,9 +30,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -111,8 +113,6 @@ public class DispatcherHATest extends TestLogger { */ @Test public void testGrantingRevokingLeadership() throws Exception { - - final Configuration configuration = new Configuration(); final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph(); final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(nonEmptyJobGraph, null); @@ -125,7 +125,34 @@ public class DispatcherHATest extends TestLogger { final BlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2); - final HATestingDispatcher dispatcher = new HATestingDispatcher( + final HATestingDispatcher dispatcher = createHADispatcher(highAvailabilityServices, fencingTokens); + + dispatcher.start(); + + try { + final UUID leaderId = UUID.randomUUID(); + dispatcherLeaderElectionService.isLeader(leaderId); + + dispatcherLeaderElectionService.notLeader(); + + final DispatcherId firstFencingToken = fencingTokens.take(); + + assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN)); + + enterGetJobIdsLatch.await(); + proceedGetJobIdsLatch.trigger(); + + assertThat(dispatcher.getNumberJobs(timeout).get(), is(0)); + + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, timeout); + } + } + + @Nonnull + private HATestingDispatcher createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, BlockingQueue<DispatcherId> fencingTokens) throws Exception { + final Configuration configuration = new Configuration(); + return new HATestingDispatcher( rpcService, UUID.randomUUID().toString(), configuration, @@ -139,24 +166,50 @@ public class DispatcherHATest extends TestLogger { new TestingJobManagerRunnerFactory(new CompletableFuture<>(), new CompletableFuture<>()), testingFatalErrorHandler, fencingTokens); + } + + /** + * Tests that all JobManagerRunner are terminated if the leadership of the + * Dispatcher is revoked. + */ + @Test + public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception { + + final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore()); + + final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService); + + final ArrayBlockingQueue<DispatcherId> fencingTokens = new ArrayBlockingQueue<>(2); + final HATestingDispatcher dispatcher = createHADispatcher( + highAvailabilityServices, + fencingTokens); dispatcher.start(); try { - final UUID leaderId = UUID.randomUUID(); - dispatcherLeaderElectionService.isLeader(leaderId); + // grant leadership and submit a single job + final DispatcherId expectedDispatcherId = DispatcherId.generate(); - dispatcherLeaderElectionService.notLeader(); + leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get(); - final DispatcherId firstFencingToken = fencingTokens.take(); + assertThat(fencingTokens.take(), is(equalTo(expectedDispatcherId))); - assertThat(firstFencingToken, equalTo(NULL_FENCING_TOKEN)); + final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - enterGetJobIdsLatch.await(); - proceedGetJobIdsLatch.trigger(); + final CompletableFuture<Acknowledge> submissionFuture = dispatcherGateway.submitJob(createNonEmptyJobGraph(), timeout); - assertThat(dispatcher.getNumberJobs(timeout).get(), is(0)); + submissionFuture.get(); + + assertThat(dispatcher.getNumberJobs(timeout).get(), is(1)); + // revoke the leadership --> this should stop all running JobManagerRunners + leaderElectionService.notLeader(); + + assertThat(fencingTokens.take(), is(equalTo(NULL_FENCING_TOKEN))); + + assertThat(dispatcher.getNumberJobs(timeout).get(), is(0)); } finally { RpcUtils.terminateRpcEndpoint(dispatcher, timeout); }