tillrohrmann commented on a change in pull request #16847:
URL: https://github.com/apache/flink/pull/16847#discussion_r694827555



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public static String getContainerCompletedCause(ContainerStatus 
containerStatus) {
+        final String completeContainerMessage;
+        switch (containerStatus.getExitStatus()) {
+            case ContainerExitStatus.SUCCESS:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s exited normally. Diagnostics: 
%s",
+                                containerStatus.getContainerId().toString(),
+                                containerStatus.getDiagnostics());
+                break;
+            case ContainerExitStatus.PREEMPTED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was preempted by yarn. 
Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                containerStatus.getDiagnostics());
+                break;
+            case ContainerExitStatus.INVALID:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was invalid. Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                containerStatus.getDiagnostics());
+                break;
+            case ContainerExitStatus.ABORTED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s killed by YARN because being 
released by the application or being 'lost' due to node failures etc. 
Diagnostics: %s",

Review comment:
       ```suggestion
                                   "Container %s killed by YARN, either due to 
being released by the application or being 'lost' due to node failures etc. 
Diagnostics: %s",
   ```

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -669,4 +650,111 @@ void prepareForTestStartTaskExecutorProcessVariousSpec(
                     (ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec));
         }
     }
+
+    @Test
+    public void testGetConatainerCompletedCauseForSuccess() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "success exit code",
+                        ContainerExitStatus.SUCCESS);
+        testingGetContainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForPreempteds() throws 
Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "preempted exit code",
+                        ContainerExitStatus.PREEMPTED);
+        testingGetContainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForInvalid() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "invalid exit code",
+                        ContainerExitStatus.INVALID);
+        testingGetContainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForAborted() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "aborted exit code",
+                        ContainerExitStatus.ABORTED);
+        testingGetContainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForDiskFailed() throws 
Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "disk failed exit code",
+                        ContainerExitStatus.DISKS_FAILED);
+        testingGetContainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForUnknown() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(), ContainerState.COMPLETE, 
"unknown exit code", -1);
+        testingGetContainerCompletedCause(containerStatus);
+    }
+
+    public void testingGetContainerCompletedCause(ContainerStatus 
containerStatus)
+            throws Exception {
+        new Context() {
+            {
+                addContainerRequestFutures.add(new CompletableFuture<>());
+                addContainerRequestFutures.add(new CompletableFuture<>());
+
+                
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer(
+                        (ignored1, ignored2) ->
+                                addContainerRequestFutures
+                                        .get(
+                                                
addContainerRequestFuturesNumCompleted
+                                                        .getAndIncrement())
+                                        .complete(null));
+                resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer(
+                        (ignore1, ignore2) ->
+                                
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                runTest(
+                        () -> {
+                            runInMainThread(
+                                    () ->
+                                            getDriver()
+                                                    .requestResource(
+                                                            
testingTaskExecutorProcessSpec));
+                            
resourceManagerClientCallbackHandler.onContainersAllocated(
+                                    ImmutableList.of(testingContainer));
+                            ContainerStatus testingContainerStatus =
+                                    
createTestingContainerCompletedStatus(containerStatus);
+
+                            assertTrue(
+                                    
YarnResourceManagerDriver.getContainerCompletedCause(
+                                                    testingContainerStatus)
+                                            
.contains(testingContainerStatus.getDiagnostics()));

Review comment:
       I would suggest to factor this out into its own test case. That way we 
have a test that checks the completed cause and another testing the container 
completed callback.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -627,9 +605,12 @@ protected void 
validateReleaseResources(Collection<YarnWorkerNode> workerNodes)
             verifyFutureCompleted(releaseAssignedContainerFuture);
         }
 
-        ContainerStatus createTestingContainerCompletedStatus(final 
ContainerId containerId) {
+        ContainerStatus createTestingContainerCompletedStatus(ContainerStatus 
containerStatus) {
             return new TestingContainerStatus(
-                    containerId, ContainerState.COMPLETE, "Test exit", -1);
+                    containerStatus.getContainerId(),
+                    containerStatus.getState(),
+                    containerStatus.getDiagnostics(),
+                    containerStatus.getExitStatus());
         }

Review comment:
       Maybe we can just keep `createTestingContainerCompletedStatus(final 
ContainerId containerId)` to create a `COMPLETED` container status with 
`ContainerExitStatus.SUCCESS`. For other `ContainerStatus` we can use the 
factory method `ContainerStatusPBImpl.newInstance`.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -627,9 +605,12 @@ protected void 
validateReleaseResources(Collection<YarnWorkerNode> workerNodes)
             verifyFutureCompleted(releaseAssignedContainerFuture);
         }
 
-        ContainerStatus createTestingContainerCompletedStatus(final 
ContainerId containerId) {
+        ContainerStatus createTestingContainerCompletedStatus(ContainerStatus 
containerStatus) {
             return new TestingContainerStatus(

Review comment:
       Why do we create a new `TestingContainerStatus` if we already have 
created a `ContainerStatus` instance `ContainerStatusPBImpl`.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -669,4 +650,111 @@ void prepareForTestStartTaskExecutorProcessVariousSpec(
                     (ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec));
         }
     }
+
+    @Test
+    public void testGetConatainerCompletedCauseForSuccess() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "success exit code",
+                        ContainerExitStatus.SUCCESS);

Review comment:
       If we split the `testingGetContainerCompletedCause` into two tests then 
we can create a factory method for the different `ContainerStatus` that we need 
for the two tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to