Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5107#discussion_r154320903
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 ---
    @@ -117,84 +135,78 @@ public void testJobSubmission() throws Exception {
                        heartbeatServices,
                        mock(MetricRegistryImpl.class),
                        fatalErrorHandler,
    -                   jobManagerRunner,
    -                   jobId);
    +                   mockJobManagerRunner,
    +                   TEST_JOB_ID);
     
    -           try {
    -                   dispatcher.start();
    +           dispatcher.start();
    +   }
     
    -                   CompletableFuture<UUID> leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
    +   @After
    +   public void tearDown() throws Exception {
    +           try {
    +                   fatalErrorHandler.rethrowError();
    +           } finally {
    +                   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    +           }
    +   }
     
    -                   // wait for the leader to be elected
    -                   leaderFuture.get();
    +   /**
    +    * Tests that we can submit a job to the Dispatcher which then spawns a
    +    * new JobManagerRunner.
    +    */
    +   @Test
    +   public void testJobSubmission() throws Exception {
    +           CompletableFuture<UUID> leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
     
    -                   DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
    +           // wait for the leader to be elected
    +           leaderFuture.get();
     
    -                   CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
    +           DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
     
    -                   acknowledgeFuture.get();
    +           CompletableFuture<Acknowledge> acknowledgeFuture = 
dispatcherGateway.submitJob(jobGraph, timeout);
     
    -                   verify(jobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
    +           acknowledgeFuture.get();
     
    -                   // check that no error has occurred
    -                   fatalErrorHandler.rethrowError();
    -           } finally {
    -                   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    -           }
    +           verify(mockJobManagerRunner, 
Mockito.timeout(timeout.toMilliseconds())).start();
        }
     
        /**
         * Tests that the dispatcher takes part in the leader election.
         */
        @Test
        public void testLeaderElection() throws Exception {
    -           TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
    -           TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
    -
                UUID expectedLeaderSessionId = UUID.randomUUID();
    -           CompletableFuture<UUID> leaderSessionIdFuture = new 
CompletableFuture<>();
    -           SubmittedJobGraphStore mockSubmittedJobGraphStore = 
mock(SubmittedJobGraphStore.class);
    -           TestingLeaderElectionService testingLeaderElectionService = new 
TestingLeaderElectionService() {
    -                   @Override
    -                   public void confirmLeaderSessionID(UUID 
leaderSessionId) {
    -                           super.confirmLeaderSessionID(leaderSessionId);
    -                           leaderSessionIdFuture.complete(leaderSessionId);
    -                   }
    -           };
    -
    -           
haServices.setSubmittedJobGraphStore(mockSubmittedJobGraphStore);
    -           
haServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
    -           HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
    -           final JobID jobId = new JobID();
    -
    -           final TestingDispatcher dispatcher = new TestingDispatcher(
    -                   rpcService,
    -                   Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
    -                   new Configuration(),
    -                   haServices,
    -                   mock(ResourceManagerGateway.class),
    -                   mock(BlobServer.class),
    -                   heartbeatServices,
    -                   mock(MetricRegistryImpl.class),
    -                   fatalErrorHandler,
    -                   mock(JobManagerRunner.class),
    -                   jobId);
     
    -           try {
    -                   dispatcher.start();
    +           
assertNull(dispatcherLeaderElectionService.getConfirmationFuture());
     
    -                   assertFalse(leaderSessionIdFuture.isDone());
    +           
dispatcherLeaderElectionService.isLeader(expectedLeaderSessionId);
     
    -                   
testingLeaderElectionService.isLeader(expectedLeaderSessionId);
    +           UUID actualLeaderSessionId = 
dispatcherLeaderElectionService.getConfirmationFuture()
    +                   .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
     
    -                   UUID actualLeaderSessionId = 
leaderSessionIdFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    +           assertEquals(expectedLeaderSessionId, actualLeaderSessionId);
     
    -                   assertEquals(expectedLeaderSessionId, 
actualLeaderSessionId);
    +           verify(submittedJobGraphStore, 
Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
    +   }
     
    -                   verify(mockSubmittedJobGraphStore, 
Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).getJobIds();
    -           } finally {
    -                   RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
    -           }
    +   /**
    +    * Test callbacks from
    +    * {@link 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}.
    +    */
    +   @Test
    +   public void testSubmittedJobGraphListener() throws Exception {
    +           CompletableFuture<UUID> leaderFuture = 
dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
    +           leaderFuture.get();
    +
    +           dispatcher.submitJob(jobGraph, timeout);
    +
    +           // pretend that other Dispatcher has removed job from 
submittedJobGraphStore
    +           dispatcher.onRemovedJobGraph(TEST_JOB_ID);
    +           assertThat(dispatcher.listJobs(timeout).get(), hasSize(0));
    --- End diff --
    
    Test is not thread-safe.


---

Reply via email to