Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3151#discussion_r102945494
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
---
@@ -105,6 +110,105 @@
@Rule
public TestName name = new TestName();
+ @Test
+ public void testHeartbeatTimeoutWithJobManager() throws Exception {
+ final JobID jobId = new JobID();
+ final Configuration configuration = new Configuration();
+ final TaskManagerConfiguration tmConfig =
TaskManagerConfiguration.fromConfiguration(configuration);
+ final ResourceID tmResourceId = new ResourceID("tm");
+ final TaskManagerLocation taskManagerLocation = new
TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)),
mock(TimerService.class));
+
+ final TestingSerialRpcService rpc = new
TestingSerialRpcService();
+ final JobLeaderService jobLeaderService = new
JobLeaderService(taskManagerLocation);
+ final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+ final TestingLeaderRetrievalService rmLeaderRetrievalService =
new TestingLeaderRetrievalService();
+ final TestingLeaderRetrievalService jmLeaderRetrievalService =
new TestingLeaderRetrievalService();
+ haServices.setJobMasterLeaderRetriever(jobId,
jmLeaderRetrievalService);
+
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+ final TestingFatalErrorHandler testingFatalErrorHandler = new
TestingFatalErrorHandler();
+
+ final long heartbeatTimeout = 1000L;
+ final HeartbeatManagerImpl<Object, Object> tmHeartbeatManager =
new HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ tmResourceId,
+ rpc.getExecutor(),
+ Executors.newSingleThreadScheduledExecutor(),
+ log);
+
+ final String jobMasterAddress = "jm";
+ final UUID jmLeaderId = UUID.randomUUID();
+ final ResourceID jmResourceId = new
ResourceID(jobMasterAddress);
+ final JobMasterGateway jobMasterGateway =
mock(JobMasterGateway.class);
+ final int blobPort = 42;
+
+ when(jobMasterGateway.registerTaskManager(
+ any(String.class),
+ eq(taskManagerLocation),
+ eq(jmLeaderId),
+ any(Time.class)
+
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
JMTMRegistrationSuccess(jmResourceId, blobPort)));
+
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
+
+ try {
+ final TaskExecutor taskManager = new TaskExecutor(
+ tmConfig,
+ taskManagerLocation,
+ rpc,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices,
+ mock(MetricRegistry.class),
+ tmHeartbeatManager,
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ new JobManagerTable(),
+ jobLeaderService,
+ testingFatalErrorHandler);
+
+ taskManager.start();
+
+ rpc.registerGateway(jobMasterAddress, jobMasterGateway);
+
+ // we have to add the job after the TaskExecutor,
because otherwise the service has not
+ // been properly started.
+ jobLeaderService.addJob(jobId, jobMasterAddress);
+
+ // now inform the task manager about the new job leader
+
jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
+
+ // register task manager success will trigger
monitoring heartbeat target between tm and jm
+ verify(jobMasterGateway).registerTaskManager(
+ eq(taskManager.getAddress()),
eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
+
+ final ConcurrentHashMap<ResourceID, Object>
heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager,
"heartbeatTargets");
+ final JobManagerTable jobManagerTable =
Whitebox.getInternalState(taskManager, "jobManagerTable");
+ final Map<ResourceID, JobManagerConnection>
jobManagerConnections = Whitebox.getInternalState(taskManager,
"jobManagerConnections");
+
+ // before heartbeat timeout
+ assertTrue(heartbeatTargets.containsKey(jmResourceId));
+ assertTrue(jobManagerTable.contains(jobId));
+
assertTrue(jobManagerConnections.containsKey(jmResourceId));
+
+ // the job manager will not schedule heartbeat because
of mock and the task manager will be notified heartbeat timeout
+ Thread.sleep(heartbeatTimeout);
+
+ // after heartbeat timeout
+ assertFalse(jobManagerTable.contains(jobId));
+
assertFalse(jobManagerConnections.containsKey(jmResourceId));
+
verify(jobMasterGateway).disconnectTaskManager(eq(tmResourceId));
--- End diff --
Better to introduce the timeout here. That way we wait a given time until
the `disconnectTaskManager` should have happened.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---