This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ddd7c927 [FLINK-29744] Throw DeploymentFailedException on 
ImagePullBackOff
ddd7c927 is described below

commit ddd7c927481b52fc3422ef6534760d8d436428e7
Author: Matyas Orhidi <[email protected]>
AuthorDate: Mon Oct 24 17:19:23 2022 -0700

    [FLINK-29744] Throw DeploymentFailedException on ImagePullBackOff
---
 .../exception/DeploymentFailedException.java       |  1 +
 .../AbstractFlinkDeploymentObserver.java           | 11 ++-
 .../flink/kubernetes/operator/TestUtils.java       |  5 +-
 .../controller/FlinkDeploymentControllerTest.java  | 97 ++++++++++++++++++++--
 .../deployment/ApplicationObserverTest.java        |  4 +-
 5 files changed, 103 insertions(+), 15 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
index 85fc604b..7721bf65 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -24,6 +24,7 @@ import 
io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
 public class DeploymentFailedException extends RuntimeException {
 
     public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
+    public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
 
     private static final long serialVersionUID = -1070179896083579221L;
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index d13068d6..62ace0a2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 /** Base observer for session and application clusters. */
 public abstract class AbstractFlinkDeploymentObserver
@@ -145,7 +146,7 @@ public abstract class AbstractFlinkDeploymentObserver
             try {
                 checkFailedCreate(status);
                 // checking the pod is expensive; only do it when the 
deployment isn't ready
-                checkCrashLoopBackoff(flinkApp, effectiveConfig);
+                checkContainerBackoff(flinkApp, effectiveConfig);
             } catch (DeploymentFailedException dfe) {
                 // throw only when not already in error status to allow for 
spec update
                 
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
@@ -179,14 +180,16 @@ public abstract class AbstractFlinkDeploymentObserver
         }
     }
 
-    private void checkCrashLoopBackoff(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
+    private void checkContainerBackoff(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
         PodList jmPods = flinkService.getJmPodList(flinkApp, effectiveConfig);
         for (Pod pod : jmPods.getItems()) {
             for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
                 ContainerStateWaiting csw = cs.getState().getWaiting();
                 if (csw != null
-                        && 
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF.equals(
-                                csw.getReason())) {
+                        && Set.of(
+                                        
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
+                                        
DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF)
+                                .contains(csw.getReason())) {
                     throw new DeploymentFailedException(csw);
                 }
             }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index ebbb35c9..44fe90b2 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -219,12 +218,12 @@ public class TestUtils {
         return pod;
     }
 
-    public static PodList createFailedPodList(String crashLoopMessage) {
+    public static PodList createFailedPodList(String crashLoopMessage, String 
reason) {
         ContainerStatus cs =
                 new ContainerStatusBuilder()
                         .withNewState()
                         .withNewWaiting()
-                        
.withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
+                        .withReason(reason)
                         .withMessage(crashLoopMessage)
                         .endWaiting()
                         .endState()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 9cc05231..6728e0cc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -297,8 +298,90 @@ public class FlinkDeploymentControllerTest {
                 updateControl.getScheduleDelay().get());
     }
 
-    @Test
-    public void verifyInProgressDeploymentWithCrashLoopBackoff() throws 
Exception {
+    @ParameterizedTest()
+    @ValueSource(
+            strings = {
+                DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
+                DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF
+            })
+    public void verifyInProgressDeploymentWithBackoff(String reason) throws 
Exception {
+        String crashLoopMessage = "container fails";
+
+        var submittedEventValidatingResponseProvider =
+                new TestUtils.ValidatingResponseProvider<>(
+                        new 
EventBuilder().withNewMetadata().endMetadata().build(),
+                        r ->
+                                assertTrue(
+                                        r.getBody()
+                                                .readUtf8()
+                                                .contains(
+                                                        
AbstractFlinkResourceReconciler
+                                                                .MSG_SUBMIT)));
+        mockServer
+                .expect()
+                .post()
+                .withPath("/api/v1/namespaces/flink-operator-test/events")
+                .andReply(submittedEventValidatingResponseProvider)
+                .once();
+
+        var validatingResponseProvider =
+                new TestUtils.ValidatingResponseProvider<>(
+                        new 
EventBuilder().withNewMetadata().endMetadata().build(),
+                        r -> {
+                            String recordedRequestBody = 
r.getBody().readUtf8();
+                            assertTrue(recordedRequestBody.contains(reason));
+                            
assertTrue(recordedRequestBody.contains(crashLoopMessage));
+                        });
+        mockServer
+                .expect()
+                .post()
+                .withPath("/api/v1/namespaces/flink-operator-test/events")
+                .andReply(validatingResponseProvider)
+                .once();
+
+        
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage, 
reason));
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        UpdateControl<FlinkDeployment> updateControl;
+
+        testController.reconcile(appCluster, context);
+        updateControl =
+                testController.reconcile(
+                        appCluster, 
TestUtils.createContextWithInProgressDeployment());
+        submittedEventValidatingResponseProvider.assertValidated();
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                Optional.of(
+                        
configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+                updateControl.getScheduleDelay());
+
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                appCluster.getStatus().getJobStatus().getState());
+
+        // Validate status status
+        assertNotNull(appCluster.getStatus().getError());
+
+        // next cycle should not create another event
+        updateControl =
+                testController.reconcile(
+                        appCluster, 
TestUtils.createContextWithFailedJobManagerDeployment());
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+        assertFalse(updateControl.isUpdateStatus());
+        assertEquals(
+                JobManagerDeploymentStatus.READY
+                        .rescheduleAfter(appCluster, 
configManager.getOperatorConfiguration())
+                        .toMillis(),
+                updateControl.getScheduleDelay().get());
+        validatingResponseProvider.assertValidated();
+    }
+
+    public void verifyInProgressDeployment(String reason) throws Exception {
         String crashLoopMessage = "container fails";
 
         var submittedEventValidatingResponseProvider =
@@ -323,9 +406,7 @@ public class FlinkDeploymentControllerTest {
                         new 
EventBuilder().withNewMetadata().endMetadata().build(),
                         r -> {
                             String recordedRequestBody = 
r.getBody().readUtf8();
-                            assertTrue(
-                                    recordedRequestBody.contains(
-                                            
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
+                            assertTrue(recordedRequestBody.contains(reason));
                             
assertTrue(recordedRequestBody.contains(crashLoopMessage));
                         });
         mockServer
@@ -335,7 +416,7 @@ public class FlinkDeploymentControllerTest {
                 .andReply(validatingResponseProvider)
                 .once();
 
-        
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
+        
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage, 
reason));
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         UpdateControl<FlinkDeployment> updateControl;
@@ -852,7 +933,9 @@ public class FlinkDeploymentControllerTest {
     @Test
     public void testSuccessfulObservationShouldClearErrors() throws Exception {
         final String crashLoopMessage = "deploy errors";
-        
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
+        flinkService.setJmPodList(
+                TestUtils.createFailedPodList(
+                        crashLoopMessage, 
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
 
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 44570f39..cb472799 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -624,7 +624,9 @@ public class ApplicationObserverTest {
                 deployment.getStatus().getJobManagerDeploymentStatus());
         // simulate deployment failure
         String podFailedMessage = "list jobs error";
-        
flinkService.setJmPodList(TestUtils.createFailedPodList(podFailedMessage));
+        flinkService.setJmPodList(
+                TestUtils.createFailedPodList(
+                        podFailedMessage, 
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
         flinkService.setPortReady(false);
         Exception exception =
                 assertThrows(

Reply via email to