tillrohrmann commented on a change in pull request #16847:
URL: https://github.com/apache/flink/pull/16847#discussion_r692825217



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public String getConatainerCompletedCause(ContainerStatus containerStatus) 
{
+        String completeContainerMessage = containerStatus.getDiagnostics();
+        switch (containerStatus.getExitStatus()) {
+            case ContainerExitStatus.SUCCESS:
+                log.debug(
+                        "Executor for container {} exited because of a YARN 
event (e.g., "
+                                + "preemption) and not because of an error in 
the running job. Diagnostics: {}",

Review comment:
       If the container were preempted, wouldn't we receive a 
`ContainerExitStatus.PREEMPTED` event? Maybe we can log that the container 
exited normally.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public String getConatainerCompletedCause(ContainerStatus containerStatus) 
{

Review comment:
       Can't we make this function a static utility method? Then we could also 
test it quite easily in isolation and don't have to start the whole 
`YarnResourceManagerDriver`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public String getConatainerCompletedCause(ContainerStatus containerStatus) 
{

Review comment:
       ```suggestion
       public String getContainerCompletedCause(ContainerStatus 
containerStatus) {
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public String getConatainerCompletedCause(ContainerStatus containerStatus) 
{
+        String completeContainerMessage = containerStatus.getDiagnostics();
+        switch (containerStatus.getExitStatus()) {
+            case ContainerExitStatus.SUCCESS:
+                log.debug(
+                        "Executor for container {} exited because of a YARN 
event (e.g., "
+                                + "preemption) and not because of an error in 
the running job. Diagnostics: {}",
+                        containerStatus.getContainerId().toString(),
+                        completeContainerMessage);
+                break;
+            case ContainerExitStatus.PREEMPTED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was preempted by yarn. 
Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                completeContainerMessage);
+                break;
+            case ContainerExitStatus.INVALID:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was invalid. Diagnostics: %s",

Review comment:
       What does it mean that the container was invalid?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public String getConatainerCompletedCause(ContainerStatus containerStatus) 
{
+        String completeContainerMessage = containerStatus.getDiagnostics();
+        switch (containerStatus.getExitStatus()) {
+            case ContainerExitStatus.SUCCESS:
+                log.debug(
+                        "Executor for container {} exited because of a YARN 
event (e.g., "
+                                + "preemption) and not because of an error in 
the running job. Diagnostics: {}",
+                        containerStatus.getContainerId().toString(),
+                        completeContainerMessage);
+                break;
+            case ContainerExitStatus.PREEMPTED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was preempted by yarn. 
Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                completeContainerMessage);
+                break;
+            case ContainerExitStatus.INVALID:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was invalid. Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                completeContainerMessage);
+                break;
+            case ContainerExitStatus.ABORTED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s killed by YARN for being 
released by the application or being 'lost' due to node failures etc. 
Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                completeContainerMessage);
+                break;
+            case ContainerExitStatus.DISKS_FAILED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s is failed for threshold number 
of the nodemanager-local-directories or"
+                                        + " threshold number of the 
nodemanager-log-directories become bad. Diagnostics: %s",

Review comment:
       ```suggestion
                                   "Container %s is failed because threshold 
number of the nodemanager-local-directories or"
                                           + " threshold number of the 
nodemanager-log-directories have become bad. Diagnostics: %s",
   ```

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -627,9 +605,12 @@ protected void 
validateReleaseResources(Collection<YarnWorkerNode> workerNodes)
             verifyFutureCompleted(releaseAssignedContainerFuture);
         }
 
-        ContainerStatus createTestingContainerCompletedStatus(final 
ContainerId containerId) {
+        ContainerStatus createTestingContainerCompletedStatus(ContainerStatus 
containerStatus) {
             return new TestingContainerStatus(
-                    containerId, ContainerState.COMPLETE, "Test exit", -1);
+                    containerStatus.getContainerId(),
+                    containerStatus.getState(),
+                    containerStatus.getDiagnostics(),
+                    containerStatus.getExitStatus());
         }

Review comment:
       I don't really understand this change. Can't we keep this utility method 
to create a `ContainerStatus` that is `ContainerState.COMPLETE`?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public String getConatainerCompletedCause(ContainerStatus containerStatus) 
{
+        String completeContainerMessage = containerStatus.getDiagnostics();

Review comment:
       Nit, if we make this variable `final` and don't assign it 
`containerStatus.getDiagnostics()`, then we get a compiler check that every 
switch case assigns a proper value to this variable. Now with mutable variables 
this is not easily ensured.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -669,4 +650,116 @@ void prepareForTestStartTaskExecutorProcessVariousSpec(
                     (ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec));
         }
     }
+
+    @Test
+    public void testGetConatainerCompletedCauseForSuccess() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "success exit code",
+                        ContainerExitStatus.SUCCESS);
+        testingGetConatainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForPreempteds() throws 
Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "preempted exit code",
+                        ContainerExitStatus.PREEMPTED);
+        testingGetConatainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForInvalid() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "invalid exit code",
+                        ContainerExitStatus.INVALID);
+        testingGetConatainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForAborted() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "aborted exit code",
+                        ContainerExitStatus.ABORTED);
+        testingGetConatainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForDiskFailed() throws 
Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(),
+                        ContainerState.COMPLETE,
+                        "disk failed exit code",
+                        ContainerExitStatus.DISKS_FAILED);
+        testingGetConatainerCompletedCause(containerStatus);
+    }
+
+    @Test
+    public void testGetConatainerCompletedCauseForUnknown() throws Exception {
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(), ContainerState.COMPLETE, 
"unknown exit code", -1);
+        testingGetConatainerCompletedCause(containerStatus);
+    }
+
+    public void testingGetConatainerCompletedCause(ContainerStatus 
containerStatus)
+            throws Exception {
+        new Context() {
+            {
+                addContainerRequestFutures.add(new CompletableFuture<>());
+                addContainerRequestFutures.add(new CompletableFuture<>());
+
+                
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer(
+                        (ignored1, ignored2) ->
+                                addContainerRequestFutures
+                                        .get(
+                                                
addContainerRequestFuturesNumCompleted
+                                                        .getAndIncrement())
+                                        .complete(null));
+                resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer(
+                        (ignore1, ignore2) ->
+                                
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                runTest(
+                        () -> {
+                            runInMainThread(
+                                    () ->
+                                            getDriver()
+                                                    .requestResource(
+                                                            
testingTaskExecutorProcessSpec));
+                            
resourceManagerClientCallbackHandler.onContainersAllocated(
+                                    ImmutableList.of(testingContainer));
+                            ContainerStatus testingContainerStatus =
+                                    
createTestingContainerCompletedStatus(containerStatus);
+
+                            ResourceManagerDriver resourceManagerDriver = 
getDriver();
+                            if (resourceManagerDriver instanceof 
YarnResourceManagerDriver) {
+                                YarnResourceManagerDriver 
yarnResourceManagerDriver =
+                                        (YarnResourceManagerDriver) 
resourceManagerDriver;
+                                assertTrue(
+                                        yarnResourceManagerDriver
+                                                
.getConatainerCompletedCause(testingContainerStatus)
+                                                
.contains(testingContainerStatus.getDiagnostics()));

Review comment:
       I am wondering whether we can't test this in isolation if 
`getConatainerCompletedCause` is a static utility method. Independent of this 
change, I think it is a good improvement that we test the overall 
`onContainersCompleted` method for the different `ContainerExitStatus`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -689,4 +690,54 @@ public void onStopContainerError(ContainerId containerId, 
Throwable throwable) {
                     throwable);
         }
     }
+
+    public String getConatainerCompletedCause(ContainerStatus containerStatus) 
{
+        String completeContainerMessage = containerStatus.getDiagnostics();
+        switch (containerStatus.getExitStatus()) {
+            case ContainerExitStatus.SUCCESS:
+                log.debug(
+                        "Executor for container {} exited because of a YARN 
event (e.g., "
+                                + "preemption) and not because of an error in 
the running job. Diagnostics: {}",
+                        containerStatus.getContainerId().toString(),
+                        completeContainerMessage);
+                break;
+            case ContainerExitStatus.PREEMPTED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was preempted by yarn. 
Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                completeContainerMessage);
+                break;
+            case ContainerExitStatus.INVALID:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s was invalid. Diagnostics: %s",
+                                containerStatus.getContainerId().toString(),
+                                completeContainerMessage);
+                break;
+            case ContainerExitStatus.ABORTED:
+                completeContainerMessage =
+                        String.format(
+                                "Container %s killed by YARN for being 
released by the application or being 'lost' due to node failures etc. 
Diagnostics: %s",

Review comment:
       ```suggestion
                                   "Container %s killed by YARN or being 
released by the application or being 'lost' due to node failures etc. 
Diagnostics: %s",
   ```

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -238,40 +240,10 @@ public void testOnContainerAllocated() throws Exception {
 
     @Test
     public void testOnContainerCompleted() throws Exception {
-        new Context() {
-            {
-                addContainerRequestFutures.add(new CompletableFuture<>());
-                addContainerRequestFutures.add(new CompletableFuture<>());
-
-                
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer(
-                        (ignored1, ignored2) ->
-                                addContainerRequestFutures
-                                        .get(
-                                                
addContainerRequestFuturesNumCompleted
-                                                        .getAndIncrement())
-                                        .complete(null));
-                resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer(
-                        (ignore1, ignore2) ->
-                                
getDriver().requestResource(testingTaskExecutorProcessSpec));
-
-                runTest(
-                        () -> {
-                            runInMainThread(
-                                    () ->
-                                            getDriver()
-                                                    .requestResource(
-                                                            
testingTaskExecutorProcessSpec));
-                            
resourceManagerClientCallbackHandler.onContainersAllocated(
-                                    ImmutableList.of(testingContainer));
-                            ContainerStatus testingContainerStatus =
-                                    
createTestingContainerCompletedStatus(testingContainer.getId());
-                            
resourceManagerClientCallbackHandler.onContainersCompleted(
-                                    ImmutableList.of(testingContainerStatus));
-
-                            
verifyFutureCompleted(addContainerRequestFutures.get(1));
-                        });
-            }
-        };
+        ContainerStatus containerStatus =
+                ContainerStatusPBImpl.newInstance(
+                        testingContainer.getId(), ContainerState.COMPLETE, 
"Test exit", -1);
+        testingGetConatainerCompletedCause(containerStatus);

Review comment:
       ```suggestion
           testingGetContainerCompletedCause(containerStatus);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to