tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258507526
 
 

 ##########
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##########
 @@ -518,4 +528,61 @@ public void testOnContainerCompleted() throws Exception {
                        });
                }};
        }
+
+       /**
+        *      Tests that YarnResourceManager will trigger to reject all 
pending slot request, when maximum number of failed
+        *      contains is hit.
+        */
+       @Test
+       public void testOnContainersAllocatedWithFailure() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               CompletableFuture<?> registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+                                       
rmServices.slotManager.registerSlotRequest(
+                                               new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+                                       return null;
+                               });
+
+                               // wait for the registerSlotRequest completion
+                               registerSlotRequestFuture.get();
+
+                               // Callback from YARN when container is 
allocated.
+                               Container disconnectedContainer1 = 
mockContainer("container1", 1234, 1, resourceManager.getContainerResource());
+
+                               
doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
+                                       
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), 
anyString(), any(Resource.class));
+
+                               
resourceManager.onContainersAllocated(ImmutableList.of(disconnectedContainer1));
+                               
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+                               
verify(mockNMClient).startContainer(eq(disconnectedContainer1), 
any(ContainerLaunchContext.class));
+
+                               ResourceID connectedTM = new 
ResourceID(disconnectedContainer1.getId().toString());
+
+                               
resourceManager.registerTaskExecutor("container1", connectedTM, 1234,
+                                       hardwareDescription, Time.seconds(10L));
+
+                               // force to unregister the task manager
+                               
resourceManager.disconnectTaskManager(connectedTM, new TimeoutException());
+
+                               // request second slot
+                               registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+                                       
rmServices.slotManager.registerSlotRequest(
+                                               new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+                                       return null;
+                               });
+
+                               // wait for the registerSlotRequest completion
+                               registerSlotRequestFuture.get();
+                               Container failedContainer = 
mockContainer("container2", 2345, 2, resourceManager.getContainerResource());
+                               
when(mockNMClient.startContainer(eq(failedContainer), any())).thenThrow(new 
YarnException("Failed"));
+
+                               CompletableFuture<?> 
rejectAllPendingRequestFuture = resourceManager.runInMainThread(() -> {
+                                       
resourceManager.onContainersAllocated(ImmutableList.of(failedContainer));
+                                       return  null;
+                               });
+                               rejectAllPendingRequestFuture.get();
+                               
assertEquals(rmServices.slotManager.getNumberPendingSlotRequest(), 0);
+                       });
+               }};
 
 Review comment:
   This test is super hard to maintain with all this mocking. I would suggest 
to add a test for the `ResourceManager` which makes sure that you cannot start 
a new worker if the `FailureRater` exceeds the maximum failure rate. 
Additionally, we should add test cases for the `YarnResourceManager` which 
makes sure that `onContainersCompleted` and failures in the 
`onContainerAllocated` method will increase the `failureRater`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to