Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6192#discussion_r198915014
--- Diff:
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws
Exception {
assertFalse("YARN application directory was not
removed", Files.exists(applicationDir.toPath()));
}};
}
+
+ @Test
+ public void testOnContainerCompleted() throws Exception {
+ new Context() {{
+ startResourceManager();
+ CompletableFuture<?> registerSlotRequestFuture =
resourceManager.runInMainThread(() -> {
+ rmServices.slotManager.registerSlotRequest(
+ new SlotRequest(new JobID(), new
AllocationID(), resourceProfile1, taskHost));
+ return null;
+ });
+ // wait for the registerSlotRequest completion
+ registerSlotRequestFuture.get();
+ // Callback from YARN when container is allocated.
+ Container testingContainer = mock(Container.class);
+ when(testingContainer.getId()).thenReturn(
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+ 1),
+ 1));
+
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container",
1234));
+
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
+
+ ImmutableList<Container> testingContainerList =
ImmutableList.of(testingContainer);
+
resourceManager.onContainersAllocated(testingContainerList);
+
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+
verify(mockNMClient).startContainer(eq(testingContainer),
any(ContainerLaunchContext.class));
+
+ // Remote task executor registers with
YarnResourceManager.
+ TaskExecutorGateway mockTaskExecutorGateway =
mock(TaskExecutorGateway.class);
--- End diff --
Can we use the `TestingTaskExecutorGateway` here?
---