This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ddd7c927 [FLINK-29744] Throw DeploymentFailedException on
ImagePullBackOff
ddd7c927 is described below
commit ddd7c927481b52fc3422ef6534760d8d436428e7
Author: Matyas Orhidi <[email protected]>
AuthorDate: Mon Oct 24 17:19:23 2022 -0700
[FLINK-29744] Throw DeploymentFailedException on ImagePullBackOff
---
.../exception/DeploymentFailedException.java | 1 +
.../AbstractFlinkDeploymentObserver.java | 11 ++-
.../flink/kubernetes/operator/TestUtils.java | 5 +-
.../controller/FlinkDeploymentControllerTest.java | 97 ++++++++++++++++++++--
.../deployment/ApplicationObserverTest.java | 4 +-
5 files changed, 103 insertions(+), 15 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
index 85fc604b..7721bf65 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java
@@ -24,6 +24,7 @@ import
io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
public class DeploymentFailedException extends RuntimeException {
public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff";
+ public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff";
private static final long serialVersionUID = -1070179896083579221L;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index d13068d6..62ace0a2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
/** Base observer for session and application clusters. */
public abstract class AbstractFlinkDeploymentObserver
@@ -145,7 +146,7 @@ public abstract class AbstractFlinkDeploymentObserver
try {
checkFailedCreate(status);
// checking the pod is expensive; only do it when the
deployment isn't ready
- checkCrashLoopBackoff(flinkApp, effectiveConfig);
+ checkContainerBackoff(flinkApp, effectiveConfig);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for
spec update
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
@@ -179,14 +180,16 @@ public abstract class AbstractFlinkDeploymentObserver
}
}
- private void checkCrashLoopBackoff(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
+ private void checkContainerBackoff(FlinkDeployment flinkApp, Configuration
effectiveConfig) {
PodList jmPods = flinkService.getJmPodList(flinkApp, effectiveConfig);
for (Pod pod : jmPods.getItems()) {
for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) {
ContainerStateWaiting csw = cs.getState().getWaiting();
if (csw != null
- &&
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF.equals(
- csw.getReason())) {
+ && Set.of(
+
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
+
DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF)
+ .contains(csw.getReason())) {
throw new DeploymentFailedException(csw);
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index ebbb35c9..44fe90b2 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -38,7 +38,6 @@ import
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import
org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
@@ -219,12 +218,12 @@ public class TestUtils {
return pod;
}
- public static PodList createFailedPodList(String crashLoopMessage) {
+ public static PodList createFailedPodList(String crashLoopMessage, String
reason) {
ContainerStatus cs =
new ContainerStatusBuilder()
.withNewState()
.withNewWaiting()
-
.withReason(DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)
+ .withReason(reason)
.withMessage(crashLoopMessage)
.endWaiting()
.endState()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 9cc05231..6728e0cc 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -56,6 +56,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.List;
@@ -297,8 +298,90 @@ public class FlinkDeploymentControllerTest {
updateControl.getScheduleDelay().get());
}
- @Test
- public void verifyInProgressDeploymentWithCrashLoopBackoff() throws
Exception {
+ @ParameterizedTest()
+ @ValueSource(
+ strings = {
+ DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF,
+ DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF
+ })
+ public void verifyInProgressDeploymentWithBackoff(String reason) throws
Exception {
+ String crashLoopMessage = "container fails";
+
+ var submittedEventValidatingResponseProvider =
+ new TestUtils.ValidatingResponseProvider<>(
+ new
EventBuilder().withNewMetadata().endMetadata().build(),
+ r ->
+ assertTrue(
+ r.getBody()
+ .readUtf8()
+ .contains(
+
AbstractFlinkResourceReconciler
+ .MSG_SUBMIT)));
+ mockServer
+ .expect()
+ .post()
+ .withPath("/api/v1/namespaces/flink-operator-test/events")
+ .andReply(submittedEventValidatingResponseProvider)
+ .once();
+
+ var validatingResponseProvider =
+ new TestUtils.ValidatingResponseProvider<>(
+ new
EventBuilder().withNewMetadata().endMetadata().build(),
+ r -> {
+ String recordedRequestBody =
r.getBody().readUtf8();
+ assertTrue(recordedRequestBody.contains(reason));
+
assertTrue(recordedRequestBody.contains(crashLoopMessage));
+ });
+ mockServer
+ .expect()
+ .post()
+ .withPath("/api/v1/namespaces/flink-operator-test/events")
+ .andReply(validatingResponseProvider)
+ .once();
+
+
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage,
reason));
+
+ FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+ UpdateControl<FlinkDeployment> updateControl;
+
+ testController.reconcile(appCluster, context);
+ updateControl =
+ testController.reconcile(
+ appCluster,
TestUtils.createContextWithInProgressDeployment());
+ submittedEventValidatingResponseProvider.assertValidated();
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ Optional.of(
+
configManager.getOperatorConfiguration().getReconcileInterval().toMillis()),
+ updateControl.getScheduleDelay());
+
+ assertEquals(
+ JobManagerDeploymentStatus.ERROR,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ appCluster.getStatus().getJobStatus().getState());
+
+ // Validate status status
+ assertNotNull(appCluster.getStatus().getError());
+
+ // next cycle should not create another event
+ updateControl =
+ testController.reconcile(
+ appCluster,
TestUtils.createContextWithFailedJobManagerDeployment());
+ assertEquals(
+ JobManagerDeploymentStatus.ERROR,
+ appCluster.getStatus().getJobManagerDeploymentStatus());
+ assertFalse(updateControl.isUpdateStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.READY
+ .rescheduleAfter(appCluster,
configManager.getOperatorConfiguration())
+ .toMillis(),
+ updateControl.getScheduleDelay().get());
+ validatingResponseProvider.assertValidated();
+ }
+
+ public void verifyInProgressDeployment(String reason) throws Exception {
String crashLoopMessage = "container fails";
var submittedEventValidatingResponseProvider =
@@ -323,9 +406,7 @@ public class FlinkDeploymentControllerTest {
new
EventBuilder().withNewMetadata().endMetadata().build(),
r -> {
String recordedRequestBody =
r.getBody().readUtf8();
- assertTrue(
- recordedRequestBody.contains(
-
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
+ assertTrue(recordedRequestBody.contains(reason));
assertTrue(recordedRequestBody.contains(crashLoopMessage));
});
mockServer
@@ -335,7 +416,7 @@ public class FlinkDeploymentControllerTest {
.andReply(validatingResponseProvider)
.once();
-
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
+
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage,
reason));
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
UpdateControl<FlinkDeployment> updateControl;
@@ -852,7 +933,9 @@ public class FlinkDeploymentControllerTest {
@Test
public void testSuccessfulObservationShouldClearErrors() throws Exception {
final String crashLoopMessage = "deploy errors";
-
flinkService.setJmPodList(TestUtils.createFailedPodList(crashLoopMessage));
+ flinkService.setJmPodList(
+ TestUtils.createFailedPodList(
+ crashLoopMessage,
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 44570f39..cb472799 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -624,7 +624,9 @@ public class ApplicationObserverTest {
deployment.getStatus().getJobManagerDeploymentStatus());
// simulate deployment failure
String podFailedMessage = "list jobs error";
-
flinkService.setJmPodList(TestUtils.createFailedPodList(podFailedMessage));
+ flinkService.setJmPodList(
+ TestUtils.createFailedPodList(
+ podFailedMessage,
DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF));
flinkService.setPortReady(false);
Exception exception =
assertThrows(