This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1dcb68abac2ef781a27f28edd055cfe3efa87e8
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Fri Aug 17 15:24:45 2018 +0200

    [hotfix][tests] Ensure that JobManagerRunners are stopped when Dispatcher 
loses leadership
---
 .../flink/runtime/dispatcher/DispatcherHATest.java | 75 ++++++++++++++++++----
 1 file changed, 64 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index adf7618..cb26f48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -30,9 +30,11 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -111,8 +113,6 @@ public class DispatcherHATest extends TestLogger {
         */
        @Test
        public void testGrantingRevokingLeadership() throws Exception {
-
-               final Configuration configuration = new Configuration();
                final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
                final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
                final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(nonEmptyJobGraph, null);
@@ -125,7 +125,34 @@ public class DispatcherHATest extends TestLogger {
 
                final BlockingQueue<DispatcherId> fencingTokens = new 
ArrayBlockingQueue<>(2);
 
-               final HATestingDispatcher dispatcher = new HATestingDispatcher(
+               final HATestingDispatcher dispatcher = 
createHADispatcher(highAvailabilityServices, fencingTokens);
+
+               dispatcher.start();
+
+               try {
+                       final UUID leaderId = UUID.randomUUID();
+                       dispatcherLeaderElectionService.isLeader(leaderId);
+
+                       dispatcherLeaderElectionService.notLeader();
+
+                       final DispatcherId firstFencingToken = 
fencingTokens.take();
+
+                       assertThat(firstFencingToken, 
equalTo(NULL_FENCING_TOKEN));
+
+                       enterGetJobIdsLatch.await();
+                       proceedGetJobIdsLatch.trigger();
+
+                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(0));
+
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+               }
+       }
+
+       @Nonnull
+       private HATestingDispatcher 
createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, 
BlockingQueue<DispatcherId> fencingTokens) throws Exception {
+               final Configuration configuration = new Configuration();
+               return new HATestingDispatcher(
                        rpcService,
                        UUID.randomUUID().toString(),
                        configuration,
@@ -139,24 +166,50 @@ public class DispatcherHATest extends TestLogger {
                        new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()),
                        testingFatalErrorHandler,
                        fencingTokens);
+       }
+
+       /**
+        * Tests that all JobManagerRunner are terminated if the leadership of 
the
+        * Dispatcher is revoked.
+        */
+       @Test
+       public void testRevokeLeadershipTerminatesJobManagerRunners() throws 
Exception {
+
+               final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
+               highAvailabilityServices.setSubmittedJobGraphStore(new 
StandaloneSubmittedJobGraphStore());
+
+               final TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               
highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+               final ArrayBlockingQueue<DispatcherId> fencingTokens = new 
ArrayBlockingQueue<>(2);
+               final HATestingDispatcher dispatcher = createHADispatcher(
+                       highAvailabilityServices,
+                       fencingTokens);
 
                dispatcher.start();
 
                try {
-                       final UUID leaderId = UUID.randomUUID();
-                       dispatcherLeaderElectionService.isLeader(leaderId);
+                       // grant leadership and submit a single job
+                       final DispatcherId expectedDispatcherId = 
DispatcherId.generate();
 
-                       dispatcherLeaderElectionService.notLeader();
+                       
leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
 
-                       final DispatcherId firstFencingToken = 
fencingTokens.take();
+                       assertThat(fencingTokens.take(), 
is(equalTo(expectedDispatcherId)));
 
-                       assertThat(firstFencingToken, 
equalTo(NULL_FENCING_TOKEN));
+                       final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-                       enterGetJobIdsLatch.await();
-                       proceedGetJobIdsLatch.trigger();
+                       final CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(createNonEmptyJobGraph(), timeout);
 
-                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(0));
+                       submissionFuture.get();
+
+                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(1));
 
+                       // revoke the leadership --> this should stop all 
running JobManagerRunners
+                       leaderElectionService.notLeader();
+
+                       assertThat(fencingTokens.take(), 
is(equalTo(NULL_FENCING_TOKEN)));
+
+                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(0));
                } finally {
                        RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
                }

Reply via email to