Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r154359352 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java --- @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception { heartbeatServices, mock(MetricRegistryImpl.class), fatalErrorHandler, - jobManagerRunner, - jobId); + mockJobManagerRunner, + TEST_JOB_ID); - try { - dispatcher.start(); + dispatcher.start(); + } - CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + @After + public void tearDown() throws Exception { + try { + fatalErrorHandler.rethrowError(); + } finally { + RpcUtils.terminateRpcEndpoint(dispatcher, timeout); + } + } - // wait for the leader to be elected - leaderFuture.get(); + /** + * Tests that we can submit a job to the Dispatcher which then spawns a + * new JobManagerRunner. + */ + @Test + public void testJobSubmission() throws Exception { + CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + // wait for the leader to be elected + leaderFuture.get(); - CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - acknowledgeFuture.get(); + CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout); - verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); + acknowledgeFuture.get(); - // check that no error has occurred - fatalErrorHandler.rethrowError(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + verify(mockJobManagerRunner, Mockito.timeout(timeout.toMilliseconds())).start(); } /** * Tests that the dispatcher takes part in the leader election. */ @Test public void testLeaderElection() throws Exception { - TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); - TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - UUID expectedLeaderSessionId = UUID.randomUUID(); - CompletableFuture<UUID> leaderSessionIdFuture = new CompletableFuture<>(); - SubmittedJobGraphStore mockSubmittedJobGraphStore = mock(SubmittedJobGraphStore.class); - TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { - @Override - public void confirmLeaderSessionID(UUID leaderSessionId) { - super.confirmLeaderSessionID(leaderSessionId); - leaderSessionIdFuture.complete(leaderSessionId); - } - }; - - haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore); - haServices.setDispatcherLeaderElectionService(testingLeaderElectionService); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final JobID jobId = new JobID(); - - final TestingDispatcher dispatcher = new TestingDispatcher( - rpcService, - Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(), - new Configuration(), - haServices, - mock(ResourceManagerGateway.class), - mock(BlobServer.class), - heartbeatServices, - mock(MetricRegistryImpl.class), - fatalErrorHandler, - mock(JobManagerRunner.class), - jobId); - try { - dispatcher.start(); + assertNull(dispatcherLeaderElectionService.getConfirmationFuture()); - assertFalse(leaderSessionIdFuture.isDone()); + dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId); - testingLeaderElectionService.isLeader(expectedLeaderSessionId); + UUID actualLeaderSessionId = dispatcherLeaderElectionService.getConfirmationFuture() + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - UUID actualLeaderSessionId = leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + assertEquals(expectedLeaderSessionId, actualLeaderSessionId); - assertEquals(expectedLeaderSessionId, actualLeaderSessionId); + verify(submittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds(); + } - verify(mockSubmittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds(); - } finally { - RpcUtils.terminateRpcEndpoint(dispatcher, timeout); - } + /** + * Test callbacks from + * {@link org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}. + */ + @Test + public void testSubmittedJobGraphListener() throws Exception { + CompletableFuture<UUID> leaderFuture = dispatcherLeaderElectionService.isLeader(UUID.randomUUID()); + leaderFuture.get(); + + dispatcher.submitJob(jobGraph, timeout); + + // pretend that other Dispatcher has removed job from submittedJobGraphStore + dispatcher.onRemovedJobGraph(TEST_JOB_ID); --- End diff -- Interacting with the dispatcher via the self gateway should help.
---