Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6067#discussion_r190611399
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
---
@@ -1483,6 +1485,216 @@ public void
testMaximumRegistrationDurationAfterConnectionLoss() throws Exceptio
}
}
+ /**
+ * Tests that we ignore slot requests if the TaskExecutor is not
+ * registered at a ResourceManager.
+ */
+ @Test
+ public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+ final TaskManagerServices taskManagerServices = new
TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
+
+ final TaskExecutor taskExecutor =
createTaskExecutor(taskManagerServices);
+
+ taskExecutor.start();
+
+ try {
+ final TestingResourceManagerGateway
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+ final CompletableFuture<RegistrationResponse>
registrationFuture = new CompletableFuture<>();
+ final CompletableFuture<ResourceID>
taskExecutorResourceIdFuture = new CompletableFuture<>();
+
+
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
-> {
+
taskExecutorResourceIdFuture.complete(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
+ return registrationFuture;
+ });
+
+
rpc.registerGateway(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway);
+
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway.getFencingToken().toUUID());
+
+ final TaskExecutorGateway taskExecutorGateway =
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+ final ResourceID resourceId =
taskExecutorResourceIdFuture.get();
+
+ final SlotID slotId = new SlotID(resourceId, 0);
+ final CompletableFuture<Acknowledge>
slotRequestResponse = taskExecutorGateway.requestSlot(slotId, jobId, new
AllocationID(), "foobar", testingResourceManagerGateway.getFencingToken(),
timeout);
+
+ try {
+ slotRequestResponse.get();
+ fail("We should not be able to request slots
before the TaskExecutor is registered at the ResourceManager.");
+ } catch (ExecutionException ee) {
+
assertThat(ExceptionUtils.stripExecutionException(ee),
instanceOf(TaskManagerException.class));
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
+ /**
+ * Tests that the TaskExecutor tries to reconnect to a ResourceManager
from which it
+ * was explicitly disconnected.
+ */
+ @Test
+ public void testReconnectionAttemptIfExplicitlyDisconnected() throws
Exception {
+ final long heartbeatInterval = 1000L;
+ final TaskSlotTable taskSlotTable = new
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+ final TaskManagerLocation taskManagerLocation = new
LocalTaskManagerLocation();
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+
TaskManagerConfiguration.fromConfiguration(configuration),
+ haServices,
+ new TaskManagerServicesBuilder()
+ .setTaskSlotTable(taskSlotTable)
+ .setTaskManagerLocation(taskManagerLocation)
+ .build(),
+ new HeartbeatServices(heartbeatInterval, 1000L),
+
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ dummyBlobCacheService,
+ testingFatalErrorHandler);
+
+ taskExecutor.start();
+
+ try {
+ final TestingResourceManagerGateway
testingResourceManagerGateway = new TestingResourceManagerGateway();
+ final ClusterInformation clusterInformation = new
ClusterInformation("foobar", 1234);
+ final CompletableFuture<RegistrationResponse>
registrationResponseFuture = CompletableFuture.completedFuture(new
TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(),
heartbeatInterval, clusterInformation));
+ final BlockingQueue<ResourceID> registrationQueue = new
ArrayBlockingQueue<>(1);
+
+
testingResourceManagerGateway.setRegisterTaskExecutorFunction(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5
-> {
+
registrationQueue.offer(stringResourceIDSlotReportIntegerHardwareDescriptionTuple5.f1);
+ return registrationResponseFuture;
+ });
+
rpc.registerGateway(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway);
+
+
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway.getFencingToken().toUUID());
+
+ final ResourceID firstRegistrationAttempt =
registrationQueue.take();
+
+ assertThat(firstRegistrationAttempt,
equalTo(taskManagerLocation.getResourceID()));
+
+ final TaskExecutorGateway taskExecutorGateway =
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+ assertThat(registrationQueue.isEmpty(), is(true));
--- End diff --
good point. Will change it.
---