Github user Clarkkkkk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199035271
  
    --- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
    @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws 
Exception {
                        assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
                }};
        }
    +
    +   @Test
    +   public void testOnContainerCompleted() throws Exception {
    +           new Context() {{
    +                   startResourceManager();
    +                   CompletableFuture<?> registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
    +                           rmServices.slotManager.registerSlotRequest(
    +                                   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
    +                           return null;
    +                   });
    +                   // wait for the registerSlotRequest completion
    +                   registerSlotRequestFuture.get();
    +                   // Callback from YARN when container is allocated.
    +                   Container testingContainer = mock(Container.class);
    +                   when(testingContainer.getId()).thenReturn(
    +                           ContainerId.newInstance(
    +                                   ApplicationAttemptId.newInstance(
    +                                           
ApplicationId.newInstance(System.currentTimeMillis(), 1),
    +                                           1),
    +                                   1));
    +                   
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
    +                   
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
    +                   
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
    +
    +                   ImmutableList<Container> testingContainerList = 
ImmutableList.of(testingContainer);
    +                   
resourceManager.onContainersAllocated(testingContainerList);
    +                   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
    +                   
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
    +
    +                   // Remote task executor registers with 
YarnResourceManager.
    +                   TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
    --- End diff --
    
    Sure, I will modify it later.


---

Reply via email to