Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5318#discussion_r162867968
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
---
@@ -1425,4 +1440,137 @@ public void
testFilterOutDuplicateJobMasterRegistrations() throws Exception {
taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
}
}
+
+ /**
+ * Tests that the heartbeat is stopped once the TaskExecutor detects
that the RM is no longer leader.
+ *
+ * <p>See FLINK-8462
+ */
+ @Test
+ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception
{
+ final long heartbeatInterval = 1L;
+ final long heartbeatTimeout = 1000L;
+ final TaskManagerConfiguration taskManagerConfiguration =
TaskManagerConfiguration.fromConfiguration(new Configuration());
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+ final TestingFatalErrorHandler testingFatalErrorHandler = new
TestingFatalErrorHandler();
+ final RecordingHeartbeatServices heartbeatServices = new
RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
+ final ResourceID rmResourceID = ResourceID.generate();
+
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+ final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+ final TestingLeaderRetrievalService rmLeaderRetrievalService =
new TestingLeaderRetrievalService();
+
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+ final String rmAddress = "rm";
+ final TestingResourceManagerGateway rmGateway = new
TestingResourceManagerGateway(
+ ResourceManagerId.generate(),
+ rmResourceID,
+ heartbeatInterval,
+ rmAddress,
+ rmAddress);
+
+ final CompletableFuture<ResourceID> registeredTaskManagerFuture
= new CompletableFuture<>();
+
+ rmGateway.setRegisterTaskExecutorFunction(
+ info -> {
+ registeredTaskManagerFuture.complete(info.f1);
+ return CompletableFuture.completedFuture(
+ new TaskExecutorRegistrationSuccess(
+ new InstanceID(),
+ rmResourceID,
+ heartbeatInterval));
+ });
+
+ rpc.registerGateway(rmAddress, rmGateway);
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+ taskManagerConfiguration,
+ taskManagerLocation,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices,
+ heartbeatServices,
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ mock(JobManagerTable.class),
+ mock(JobLeaderService.class),
+ testingFatalErrorHandler);
+
+ try {
+ taskExecutor.start();
+
+ rmLeaderRetrievalService.notifyListener(rmAddress,
rmGateway.getFencingToken().toUUID());
+
+ // wait for TM registration
+ assertThat(
+
registeredTaskManagerFuture.get(timeout.toMilliseconds(),
TimeUnit.MILLISECONDS),
+
org.hamcrest.Matchers.equalTo(taskManagerLocation.getResourceID()));
+
+ final BlockingQueue<ResourceID> unmonitoredTargets =
heartbeatServices.getUnmonitoredTargets();
+
+ // let RM lose leadership
+ rmLeaderRetrievalService.notifyListener(null, null);
+
+ // the timeout should not have triggered since it is
much higher
+ assertThat(unmonitoredTargets.poll(100L,
TimeUnit.MILLISECONDS), org.hamcrest.Matchers.equalTo(rmResourceID));
--- End diff --
I'd add a static import for `equalTo`, or simply use `assertEquals` since
the other tests already do so.
---