Github user Clarkkkkk commented on a diff in the pull request: https://github.com/apache/flink/pull/6192#discussion_r199035271 --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java --- @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws Exception { assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); }}; } + + @Test + public void testOnContainerCompleted() throws Exception { + new Context() {{ + startResourceManager(); + CompletableFuture<?> registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + // Callback from YARN when container is allocated. + Container testingContainer = mock(Container.class); + when(testingContainer.getId()).thenReturn( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1), + 1)); + when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + + ImmutableList<Container> testingContainerList = ImmutableList.of(testingContainer); + resourceManager.onContainersAllocated(testingContainerList); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); --- End diff -- Sure, I will modify it later.
---