This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 1578a296 [FLINK-37470] Improve JobManager Deployment / Pod error 
handling
1578a296 is described below

commit 1578a2960c987435b79113db5152ea4c57f4e9de
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri Mar 28 11:01:47 2025 +0100

    [FLINK-37470] Improve JobManager Deployment / Pod error handling
---
 flink-kubernetes-operator/pom.xml                  |   7 +
 .../exception/DeploymentFailedException.java       |  43 ++++--
 .../AbstractFlinkDeploymentObserver.java           |  37 +++---
 .../kubernetes/operator/utils/EventUtils.java      |  93 +++++++++++--
 .../flink/kubernetes/operator/TestUtils.java       |  21 ++-
 .../controller/FlinkDeploymentControllerTest.java  |  94 +------------
 .../exception/DeploymentFailedExceptionTest.java   |  69 ++++++++++
 .../deployment/ApplicationObserverTest.java        |  27 +++-
 .../kubernetes/operator/utils/PodErrorTest.java    | 148 +++++++++++++++++++++
 9 files changed, 407 insertions(+), 132 deletions(-)

diff --git a/flink-kubernetes-operator/pom.xml 
b/flink-kubernetes-operator/pom.xml
index b455fffa..cff4a43f 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -184,6 +184,13 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kube-api-test-client-inject</artifactId>
+            <version>${fabric8.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.squareup.okhttp3</groupId>
             <artifactId>mockwebserver</artifactId>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
index a8f9abf0..9fc6143f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -17,15 +17,25 @@
 
 package org.apache.flink.kubernetes.operator.exception;
 
-import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
+
+import io.fabric8.kubernetes.api.model.ContainerStatus;
 import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 
+import java.util.Optional;
+import java.util.Set;
+
 /** Exception to signal terminal deployment failure. */
 public class DeploymentFailedException extends RuntimeException {
 
-    public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
-    public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
-    public static final String REASON_ERR_IMAGE_PULL = "ErrImagePull";
+    public static final Set<String> CONTAINER_ERROR_REASONS =
+            ImmutableSet.of(
+                    "CrashLoopBackOff",
+                    "ImagePullBackOff",
+                    "ErrImagePull",
+                    "RunContainerError",
+                    "CreateContainerConfigError",
+                    "OOMKilled");
 
     private static final long serialVersionUID = -1070179896083579221L;
 
@@ -36,11 +46,6 @@ public class DeploymentFailedException extends 
RuntimeException {
         this.reason = deployCondition.getReason();
     }
 
-    public DeploymentFailedException(ContainerStateWaiting stateWaiting) {
-        super(stateWaiting.getMessage());
-        this.reason = stateWaiting.getReason();
-    }
-
     public DeploymentFailedException(String message, String reason) {
         super(message);
         this.reason = reason;
@@ -49,4 +54,24 @@ public class DeploymentFailedException extends 
RuntimeException {
     public String getReason() {
         return reason;
     }
+
+    public static DeploymentFailedException forContainerStatus(ContainerStatus 
status) {
+        var waiting = status.getState().getWaiting();
+        var lastState = status.getLastState();
+        String message = null;
+        if ("CrashLoopBackOff".equals(waiting.getReason())
+                && lastState != null
+                && lastState.getTerminated() != null) {
+            message =
+                    Optional.ofNullable(lastState.getTerminated().getMessage())
+                            .map(err -> "CrashLoop - " + err)
+                            .orElse(null);
+        }
+
+        if (message == null) {
+            message = waiting.getMessage();
+        }
+        return new DeploymentFailedException(
+                String.format("[%s] %s", status.getName(), message), 
waiting.getReason());
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index cd817ec8..54e1573b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -30,9 +30,9 @@ import 
org.apache.flink.kubernetes.operator.exception.MissingJobManagerException
 import 
org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
-import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
 import io.fabric8.kubernetes.api.model.ContainerStatus;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodList;
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
+import java.util.stream.Stream;
 
 /** Base observer for session and application clusters. */
 public abstract class AbstractFlinkDeploymentObserver
@@ -134,7 +134,7 @@ public abstract class AbstractFlinkDeploymentObserver
             try {
                 checkFailedCreate(status);
                 // checking the pod is expensive; only do it when the 
deployment isn't ready
-                checkContainerBackoff(ctx);
+                checkContainerErrors(ctx);
             } catch (DeploymentFailedException dfe) {
                 // throw only when not already in error status to allow for 
spec update
                 
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
@@ -171,21 +171,28 @@ public abstract class AbstractFlinkDeploymentObserver
         }
     }
 
-    private void checkContainerBackoff(FlinkResourceContext<FlinkDeployment> 
ctx) {
+    private void checkContainerErrors(FlinkResourceContext<FlinkDeployment> 
ctx) {
         PodList jmPods =
                 ctx.getFlinkService().getJmPodList(ctx.getResource(), 
ctx.getObserveConfig());
         for (Pod pod : jmPods.getItems()) {
-            for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
-                ContainerStateWaiting csw = cs.getState().getWaiting();
-                if (csw != null
-                        && Set.of(
-                                        
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
-                                        
DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
-                                        
DeploymentFailedException.REASON_ERR_IMAGE_PULL)
-                                .contains(csw.getReason())) {
-                    throw new DeploymentFailedException(csw);
-                }
-            }
+            var podStatus = pod.getStatus();
+            Stream.concat(
+                            podStatus.getContainerStatuses().stream(),
+                            podStatus.getInitContainerStatuses().stream())
+                    
.forEach(AbstractFlinkDeploymentObserver::checkContainerError);
+
+            // No obvious errors were found, check for volume mount issues
+            EventUtils.checkForVolumeMountErrors(ctx.getKubernetesClient(), 
pod);
+        }
+    }
+
+    private static void checkContainerError(ContainerStatus cs) {
+        if (cs.getState() == null || cs.getState().getWaiting() == null) {
+            return;
+        }
+        if (DeploymentFailedException.CONTAINER_ERROR_REASONS.contains(
+                cs.getState().getWaiting().getReason())) {
+            throw DeploymentFailedException.forContainerStatus(cs);
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index b515cdda..fb0dbfc6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -17,11 +17,16 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+
 import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.api.model.EventBuilder;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
+import io.fabric8.kubernetes.api.model.ObjectReference;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodCondition;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import org.slf4j.Logger;
@@ -32,10 +37,14 @@ import javax.annotation.Nullable;
 import java.net.HttpURLConnection;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * The util to generate an event for the target resource. It is copied from
@@ -182,13 +191,7 @@ public class EventUtils {
             String eventName) {
         return new EventBuilder()
                 .withApiVersion("v1")
-                .withInvolvedObject(
-                        new ObjectReferenceBuilder()
-                                .withKind(target.getKind())
-                                .withUid(target.getMetadata().getUid())
-                                .withName(target.getMetadata().getName())
-                                
.withNamespace(target.getMetadata().getNamespace())
-                                .build())
+                .withInvolvedObject(getObjectReference(target))
                 .withType(type.name())
                 .withReason(reason)
                 .withFirstTimestamp(Instant.now().toString())
@@ -235,4 +238,78 @@ public class EventUtils {
         }
         return Optional.empty();
     }
+
+    private static List<Event> getPodEvents(KubernetesClient client, Pod pod) {
+        var ref = getObjectReference(pod);
+
+        var eventList =
+                client.v1()
+                        .events()
+                        .inNamespace(pod.getMetadata().getNamespace())
+                        .withInvolvedObject(ref)
+                        .list();
+
+        if (eventList == null) {
+            return new ArrayList<>();
+        }
+
+        var items = eventList.getItems();
+        if (items == null) {
+            return new ArrayList<>();
+        }
+        return items;
+    }
+
+    @VisibleForTesting
+    protected static ObjectReference getObjectReference(HasMetadata resource) {
+        var ref = new ObjectReference();
+        ref.setApiVersion(resource.getApiVersion());
+        ref.setKind(resource.getKind());
+        ref.setName(resource.getMetadata().getName());
+        ref.setNamespace(resource.getMetadata().getNamespace());
+        ref.setUid(resource.getMetadata().getUid());
+        return ref;
+    }
+
+    /**
+     * Check that pod is stuck during volume mount stage and throw {@link 
DeploymentFailedException}
+     * with the right reason message if that's the case.
+     *
+     * @param client Kubernetes client
+     * @param pod Pod to be checked
+     */
+    public static void checkForVolumeMountErrors(KubernetesClient client, Pod 
pod) {
+        var conditions = pod.getStatus().getConditions();
+        if (conditions == null) {
+            return;
+        }
+        var conditionMap =
+                conditions.stream()
+                        .collect(Collectors.toMap(PodCondition::getType, 
Function.identity()));
+
+        // We use PodReadyToStartContainers if available otherwise use 
Initialized, but it's only
+        // there k8s 1.29+
+        boolean failedInitialization =
+                checkStatusWasAlways(
+                        pod,
+                        conditionMap.getOrDefault(
+                                "PodReadyToStartContainers", 
conditionMap.get("Initialized")),
+                        "False");
+
+        boolean notReady = checkStatusWasAlways(pod, 
conditionMap.get("Ready"), "False");
+
+        if (notReady && failedInitialization) {
+            getPodEvents(client, pod).stream()
+                    .filter(e -> e.getReason().equals("FailedMount"))
+                    .findAny()
+                    .ifPresent(
+                            e -> {
+                                throw new 
DeploymentFailedException(e.getMessage(), e.getReason());
+                            });
+        }
+    }
+
+    private static boolean checkStatusWasAlways(Pod pod, PodCondition 
condition, String status) {
+        return condition != null && condition.getStatus().equals(status);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index d9492bfd..e3a3ae25 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -97,6 +97,24 @@ public class TestUtils extends BaseTestUtils {
     public static PodList createFailedPodList(String crashLoopMessage, String 
reason) {
         ContainerStatus cs =
                 new ContainerStatusBuilder()
+                        .withName("c1")
+                        .withNewState()
+                        .withNewWaiting()
+                        .withReason(reason)
+                        .withMessage(crashLoopMessage)
+                        .endWaiting()
+                        .endState()
+                        .build();
+
+        Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
+        pod.setStatus(new 
PodStatusBuilder().withContainerStatuses(cs).build());
+        return new PodListBuilder().withItems(pod).build();
+    }
+
+    public static PodList createFailedInitContainerPodList(String 
crashLoopMessage, String reason) {
+        ContainerStatus cs =
+                new ContainerStatusBuilder()
+                        .withName("c1")
                         .withNewState()
                         .withNewWaiting()
                         .withReason(reason)
@@ -108,7 +126,8 @@ public class TestUtils extends BaseTestUtils {
         Pod pod = getTestPod("host", "apiVersion", Collections.emptyList());
         pod.setStatus(
                 new PodStatusBuilder()
-                        .withContainerStatuses(Collections.singletonList(cs))
+                        .withContainerStatuses(new 
ContainerStatusBuilder().withReady().build())
+                        .withInitContainerStatuses(cs)
                         .build());
         return new PodListBuilder().withItems(pod).build();
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 8c3bd33f..2384fb4d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.kubernetes.operator.api.status.JobStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -59,7 +58,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.Map;
@@ -299,94 +297,6 @@ public class FlinkDeploymentControllerTest {
                 updateControl.getScheduleDelay().get());
     }
 
-    @ParameterizedTest()
-    @ValueSource(
-            strings = {
-                DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
-                DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF,
-                DeploymentFailedException.REASON_ERR_IMAGE_PULL
-            })
-    public void verifyInProgressDeploymentWithError(String reason) throws 
Exception {
-        String crashLoopMessage = "container fails";
-
-        var submittedEventValidatingResponseProvider =
-                new TestUtils.ValidatingResponseProvider<>(
-                        mockedEvent,
-                        r ->
-                                assertTrue(
-                                        r.getBody()
-                                                .readUtf8()
-                                                .contains(
-                                                        
AbstractFlinkResourceReconciler
-                                                                .MSG_SUBMIT)));
-        mockServer
-                .expect()
-                .post()
-                .withPath("/api/v1/namespaces/flink-operator-test/events")
-                .andReply(submittedEventValidatingResponseProvider)
-                .once();
-
-        var validatingResponseProvider =
-                new TestUtils.ValidatingResponseProvider<>(
-                        mockedEvent,
-                        r -> {
-                            String recordedRequestBody = 
r.getBody().readUtf8();
-                            assertTrue(recordedRequestBody.contains(reason));
-                            
assertTrue(recordedRequestBody.contains(crashLoopMessage));
-                        });
-        mockServer
-                .expect()
-                .post()
-                .withPath("/api/v1/namespaces/flink-operator-test/events")
-                .andReply(validatingResponseProvider)
-                .once();
-
-        
flinkService.setPodList(TestUtils.createFailedPodList(crashLoopMessage, 
reason));
-
-        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
-        UpdateControl<FlinkDeployment> updateControl;
-
-        testController.reconcile(appCluster, context);
-        updateControl =
-                testController.reconcile(
-                        appCluster,
-                        
TestUtils.createContextWithInProgressDeployment(kubernetesClient));
-        submittedEventValidatingResponseProvider.assertValidated();
-        assertFalse(updateControl.isUpdateStatus());
-        assertEquals(
-                Optional.of(
-                        
configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
-                updateControl.getScheduleDelay());
-
-        assertEquals(
-                JobManagerDeploymentStatus.ERROR,
-                appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.RECONCILING,
-                appCluster.getStatus().getJobStatus().getState());
-
-        // Validate status status
-        assertNotNull(appCluster.getStatus().getError());
-
-        // next cycle should not create another event
-        updateControl =
-                testController.reconcile(
-                        appCluster,
-                        
TestUtils.createContextWithFailedJobManagerDeployment(kubernetesClient));
-        assertEquals(
-                JobManagerDeploymentStatus.ERROR,
-                appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertFalse(updateControl.isUpdateStatus());
-        assertEquals(
-                ReconciliationUtils.rescheduleAfter(
-                                JobManagerDeploymentStatus.READY,
-                                appCluster,
-                                configManager.getOperatorConfiguration())
-                        .toMillis(),
-                updateControl.getScheduleDelay().get());
-        validatingResponseProvider.assertValidated();
-    }
-
     @ParameterizedTest
     
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion 
flinkVersion) throws Exception {
@@ -915,9 +825,7 @@ public class FlinkDeploymentControllerTest {
     @Test
     public void testSuccessfulObservationShouldClearErrors() throws Exception {
         final String crashLoopMessage = "deploy errors";
-        flinkService.setPodList(
-                TestUtils.createFailedPodList(
-                        crashLoopMessage, 
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
+        
flinkService.setPodList(TestUtils.createFailedPodList(crashLoopMessage, 
"ErrImagePull"));
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java
new file mode 100644
index 00000000..2437d3b9
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+import io.fabric8.kubernetes.api.model.ContainerState;
+import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
+import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
+import io.fabric8.kubernetes.api.model.ContainerStatus;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link DeploymentFailedException}. */
+public class DeploymentFailedExceptionTest {
+
+    @Test
+    public void testErrorFromContainerStatus() {
+        var containerStatus = new ContainerStatus();
+        containerStatus.setName("c1");
+        var state = new ContainerState();
+        var waiting = new ContainerStateWaiting();
+        waiting.setMessage("msg");
+        waiting.setReason("r");
+        state.setWaiting(waiting);
+
+        containerStatus.setState(state);
+
+        var ex = DeploymentFailedException.forContainerStatus(containerStatus);
+        assertEquals("[c1] msg", ex.getMessage());
+        assertEquals("r", ex.getReason());
+
+        waiting.setReason("CrashLoopBackOff");
+        waiting.setMessage("backing off");
+        ex = DeploymentFailedException.forContainerStatus(containerStatus);
+        assertEquals("[c1] backing off", ex.getMessage());
+        assertEquals("CrashLoopBackOff", ex.getReason());
+
+        // Last state set but not terminated
+        var lastState = new ContainerState();
+        containerStatus.setLastState(lastState);
+
+        ex = DeploymentFailedException.forContainerStatus(containerStatus);
+        assertEquals("[c1] backing off", ex.getMessage());
+        assertEquals("CrashLoopBackOff", ex.getReason());
+
+        var terminated = new ContainerStateTerminated();
+        terminated.setMessage("crash");
+        lastState.setTerminated(terminated);
+
+        ex = DeploymentFailedException.forContainerStatus(containerStatus);
+        assertEquals("[c1] CrashLoop - crash", ex.getMessage());
+        assertEquals("CrashLoopBackOff", ex.getReason());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index aa4898d8..68233d39 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -48,12 +48,16 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import lombok.Getter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo;
 import static 
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus;
@@ -753,8 +757,9 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
     }
 
-    @Test
-    public void observeListJobsError() {
+    @ParameterizedTest
+    @MethodSource("containerFailureReasons")
+    public void observeListJobsError(String reason, boolean initError) {
         bringToReadyStatus(deployment);
         observer.observe(deployment, readyContext);
         assertEquals(
@@ -762,9 +767,12 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
                 deployment.getStatus().getJobManagerDeploymentStatus());
         // simulate deployment failure
         String podFailedMessage = "list jobs error";
-        flinkService.setPodList(
-                TestUtils.createFailedPodList(
-                        podFailedMessage, 
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
+        if (initError) {
+            flinkService.setPodList(
+                    
TestUtils.createFailedInitContainerPodList(podFailedMessage, reason));
+        } else {
+            
flinkService.setPodList(TestUtils.createFailedPodList(podFailedMessage, 
reason));
+        }
         flinkService.setPortReady(false);
         Exception exception =
                 assertThrows(
@@ -774,7 +782,14 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
                                         deployment,
                                         
TestUtils.createContextWithInProgressDeployment(
                                                 kubernetesClient)));
-        assertEquals(podFailedMessage, exception.getMessage());
+        assertEquals("[c1] " + podFailedMessage, exception.getMessage());
+    }
+
+    private static Stream<Arguments> containerFailureReasons() {
+        return DeploymentFailedException.CONTAINER_ERROR_REASONS.stream()
+                .flatMap(
+                        reason ->
+                                Stream.of(Arguments.of(reason, true), 
Arguments.of(reason, false)));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
new file mode 100644
index 00000000..d4e57809
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+
+import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
+import io.fabric8.kubernetes.api.model.EventBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodCondition;
+import io.fabric8.kubernetes.api.model.PodConditionBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link EventUtils}. */
+@EnableKubeAPIServer
+public class PodErrorTest {
+
+    static KubernetesClient client;
+
+    @Test
+    public void testVolumeMountErrors() {
+        var pod =
+                new PodBuilder()
+                        .withNewMetadata()
+                        .withName("test")
+                        .withNamespace("default")
+                        .endMetadata()
+                        .withNewStatus()
+                        .endStatus()
+                        .build();
+
+        // No conditions, no error expected
+        EventUtils.checkForVolumeMountErrors(client, pod);
+
+        var conditions = new ArrayList<PodCondition>();
+        pod.getStatus().setConditions(conditions);
+
+        // No conditions, no error expected
+        EventUtils.checkForVolumeMountErrors(client, pod);
+
+        // Create error events
+        createPodEvent("e1", "reason1", "msg1", pod);
+        createPodEvent("e2", "FailedMount", "mountErr", pod);
+
+        var conditionMap = new HashMap<String, String>();
+
+        // Pod initialized completely, shouldn't check events
+        conditionMap.put("Initialized", "True");
+        conditionMap.put("Ready", "False");
+
+        conditions.clear();
+        conditionMap.forEach(
+                (t, s) ->
+                        conditions.add(
+                                new 
PodConditionBuilder().withType(t).withStatus(s).build()));
+        EventUtils.checkForVolumeMountErrors(client, pod);
+
+        // Pod initialized completely, shouldn't check events
+        conditionMap.put("PodReadyToStartContainers", "True");
+        conditionMap.put("Initialized", "False");
+
+        conditions.clear();
+        conditionMap.forEach(
+                (t, s) ->
+                        conditions.add(
+                                new 
PodConditionBuilder().withType(t).withStatus(s).build()));
+        EventUtils.checkForVolumeMountErrors(client, pod);
+
+        // Check event only when not ready to start
+        conditionMap.put("PodReadyToStartContainers", "False");
+        conditions.clear();
+        conditionMap.forEach(
+                (t, s) ->
+                        conditions.add(
+                                new 
PodConditionBuilder().withType(t).withStatus(s).build()));
+
+        try {
+            EventUtils.checkForVolumeMountErrors(client, pod);
+            fail("Exception not thrown");
+        } catch (DeploymentFailedException dfe) {
+            assertEquals("FailedMount", dfe.getReason());
+            assertEquals("mountErr", dfe.getMessage());
+        }
+
+        // Old kubernetes without PodReadyToStartContainers
+        conditionMap.remove("PodReadyToStartContainers");
+        conditionMap.put("Initialized", "False");
+        conditions.clear();
+        conditionMap.forEach(
+                (t, s) ->
+                        conditions.add(
+                                new 
PodConditionBuilder().withType(t).withStatus(s).build()));
+
+        try {
+            EventUtils.checkForVolumeMountErrors(client, pod);
+            fail("Exception not thrown");
+        } catch (DeploymentFailedException dfe) {
+            assertEquals("FailedMount", dfe.getReason());
+            assertEquals("mountErr", dfe.getMessage());
+        }
+    }
+
+    private void createPodEvent(String name, String reason, String msg, Pod 
pod) {
+        var event =
+                new EventBuilder()
+                        .withApiVersion("v1")
+                        .withInvolvedObject(EventUtils.getObjectReference(pod))
+                        .withType("type")
+                        .withReason(reason)
+                        .withFirstTimestamp(Instant.now().toString())
+                        .withLastTimestamp(Instant.now().toString())
+                        .withNewSource()
+                        .withComponent("pod")
+                        .endSource()
+                        .withCount(1)
+                        .withMessage(msg)
+                        .withNewMetadata()
+                        .withName(name)
+                        .withNamespace(pod.getMetadata().getNamespace())
+                        .endMetadata()
+                        .build();
+        client.resource(event).create();
+    }
+}


Reply via email to