Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102945494
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 ---
    @@ -105,6 +110,105 @@
        @Rule
        public TestName name = new TestName();
     
    +   @Test
    +   public void testHeartbeatTimeoutWithJobManager() throws Exception {
    +           final JobID jobId = new JobID();
    +           final Configuration configuration = new Configuration();
    +           final TaskManagerConfiguration tmConfig = 
TaskManagerConfiguration.fromConfiguration(configuration);
    +           final ResourceID tmResourceId = new ResourceID("tm");
    +           final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
    +           final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), 
mock(TimerService.class));
    +
    +           final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
    +           final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
    +           final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
    +           final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
    +           final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService();
    +           haServices.setJobMasterLeaderRetriever(jobId, 
jmLeaderRetrievalService);
    +           
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +           final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
    +
    +           final long heartbeatTimeout = 1000L;
    +           final HeartbeatManagerImpl<Object, Object> tmHeartbeatManager = 
new HeartbeatManagerImpl<>(
    +                           heartbeatTimeout,
    +                           tmResourceId,
    +                           rpc.getExecutor(),
    +                           Executors.newSingleThreadScheduledExecutor(),
    +                           log);
    +
    +           final String jobMasterAddress = "jm";
    +           final UUID jmLeaderId = UUID.randomUUID();
    +           final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
    +           final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
    +           final int blobPort = 42;
    +
    +           when(jobMasterGateway.registerTaskManager(
    +                           any(String.class),
    +                           eq(taskManagerLocation),
    +                           eq(jmLeaderId),
    +                           any(Time.class)
    +           
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
    +           
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
    +
    +           try {
    +                   final TaskExecutor taskManager = new TaskExecutor(
    +                                   tmConfig,
    +                                   taskManagerLocation,
    +                                   rpc,
    +                                   mock(MemoryManager.class),
    +                                   mock(IOManager.class),
    +                                   mock(NetworkEnvironment.class),
    +                                   haServices,
    +                                   mock(MetricRegistry.class),
    +                                   tmHeartbeatManager,
    +                                   mock(TaskManagerMetricGroup.class),
    +                                   mock(BroadcastVariableManager.class),
    +                                   mock(FileCache.class),
    +                                   taskSlotTable,
    +                                   new JobManagerTable(),
    +                                   jobLeaderService,
    +                                   testingFatalErrorHandler);
    +
    +                   taskManager.start();
    +
    +                   rpc.registerGateway(jobMasterAddress, jobMasterGateway);
    +
    +                   // we have to add the job after the TaskExecutor, 
because otherwise the service has not
    +                   // been properly started.
    +                   jobLeaderService.addJob(jobId, jobMasterAddress);
    +
    +                   // now inform the task manager about the new job leader
    +                   
jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
    +
    +                   // register task manager success will trigger 
monitoring heartbeat target between tm and jm
    +                   verify(jobMasterGateway).registerTaskManager(
    +                                   eq(taskManager.getAddress()), 
eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
    +
    +                   final ConcurrentHashMap<ResourceID, Object> 
heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager, 
"heartbeatTargets");
    +                   final JobManagerTable jobManagerTable = 
Whitebox.getInternalState(taskManager, "jobManagerTable");
    +                   final Map<ResourceID, JobManagerConnection> 
jobManagerConnections = Whitebox.getInternalState(taskManager, 
"jobManagerConnections");
    +
    +                   // before heartbeat timeout
    +                   assertTrue(heartbeatTargets.containsKey(jmResourceId));
    +                   assertTrue(jobManagerTable.contains(jobId));
    +                   
assertTrue(jobManagerConnections.containsKey(jmResourceId));
    +
    +                   // the job manager will not schedule heartbeat because 
of mock and the task manager will be notified heartbeat timeout
    +                   Thread.sleep(heartbeatTimeout);
    +
    +                   // after heartbeat timeout
    +                   assertFalse(jobManagerTable.contains(jobId));
    +                   
assertFalse(jobManagerConnections.containsKey(jmResourceId));
    +                   
verify(jobMasterGateway).disconnectTaskManager(eq(tmResourceId));
    --- End diff --
    
    Better to introduce the timeout here. That way we wait a given time until 
the `disconnectTaskManager` should have happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to