This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 1578a296 [FLINK-37470] Improve JobManager Deployment / Pod error handling 1578a296 is described below commit 1578a2960c987435b79113db5152ea4c57f4e9de Author: Gyula Fora <g_f...@apple.com> AuthorDate: Fri Mar 28 11:01:47 2025 +0100 [FLINK-37470] Improve JobManager Deployment / Pod error handling --- flink-kubernetes-operator/pom.xml | 7 + .../exception/DeploymentFailedException.java | 43 ++++-- .../AbstractFlinkDeploymentObserver.java | 37 +++--- .../kubernetes/operator/utils/EventUtils.java | 93 +++++++++++-- .../flink/kubernetes/operator/TestUtils.java | 21 ++- .../controller/FlinkDeploymentControllerTest.java | 94 +------------ .../exception/DeploymentFailedExceptionTest.java | 69 ++++++++++ .../deployment/ApplicationObserverTest.java | 27 +++- .../kubernetes/operator/utils/PodErrorTest.java | 148 +++++++++++++++++++++ 9 files changed, 407 insertions(+), 132 deletions(-) diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml index b455fffa..cff4a43f 100644 --- a/flink-kubernetes-operator/pom.xml +++ b/flink-kubernetes-operator/pom.xml @@ -184,6 +184,13 @@ under the License. </exclusions> </dependency> + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kube-api-test-client-inject</artifactId> + <version>${fabric8.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>mockwebserver</artifactId> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java index a8f9abf0..9fc6143f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedException.java @@ -17,15 +17,25 @@ package org.apache.flink.kubernetes.operator.exception; -import io.fabric8.kubernetes.api.model.ContainerStateWaiting; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; + +import io.fabric8.kubernetes.api.model.ContainerStatus; import io.fabric8.kubernetes.api.model.apps.DeploymentCondition; +import java.util.Optional; +import java.util.Set; + /** Exception to signal terminal deployment failure. */ public class DeploymentFailedException extends RuntimeException { - public static final String REASON_CRASH_LOOP_BACKOFF = "CrashLoopBackOff"; - public static final String REASON_IMAGE_PULL_BACKOFF = "ImagePullBackOff"; - public static final String REASON_ERR_IMAGE_PULL = "ErrImagePull"; + public static final Set<String> CONTAINER_ERROR_REASONS = + ImmutableSet.of( + "CrashLoopBackOff", + "ImagePullBackOff", + "ErrImagePull", + "RunContainerError", + "CreateContainerConfigError", + "OOMKilled"); private static final long serialVersionUID = -1070179896083579221L; @@ -36,11 +46,6 @@ public class DeploymentFailedException extends RuntimeException { this.reason = deployCondition.getReason(); } - public DeploymentFailedException(ContainerStateWaiting stateWaiting) { - super(stateWaiting.getMessage()); - this.reason = stateWaiting.getReason(); - } - public DeploymentFailedException(String message, String reason) { super(message); this.reason = reason; @@ -49,4 +54,24 @@ public class DeploymentFailedException extends RuntimeException { public String getReason() { return reason; } + + public static DeploymentFailedException forContainerStatus(ContainerStatus status) { + var waiting = status.getState().getWaiting(); + var lastState = status.getLastState(); + String message = null; + if ("CrashLoopBackOff".equals(waiting.getReason()) + && lastState != null + && lastState.getTerminated() != null) { + message = + Optional.ofNullable(lastState.getTerminated().getMessage()) + .map(err -> "CrashLoop - " + err) + .orElse(null); + } + + if (message == null) { + message = waiting.getMessage(); + } + return new DeploymentFailedException( + String.format("[%s] %s", status.getName(), message), waiting.getReason()); + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java index cd817ec8..54e1573b 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java @@ -30,9 +30,9 @@ import org.apache.flink.kubernetes.operator.exception.MissingJobManagerException import org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.EventUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; -import io.fabric8.kubernetes.api.model.ContainerStateWaiting; import io.fabric8.kubernetes.api.model.ContainerStatus; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; +import java.util.stream.Stream; /** Base observer for session and application clusters. */ public abstract class AbstractFlinkDeploymentObserver @@ -134,7 +134,7 @@ public abstract class AbstractFlinkDeploymentObserver try { checkFailedCreate(status); // checking the pod is expensive; only do it when the deployment isn't ready - checkContainerBackoff(ctx); + checkContainerErrors(ctx); } catch (DeploymentFailedException dfe) { // throw only when not already in error status to allow for spec update deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING); @@ -171,21 +171,28 @@ public abstract class AbstractFlinkDeploymentObserver } } - private void checkContainerBackoff(FlinkResourceContext<FlinkDeployment> ctx) { + private void checkContainerErrors(FlinkResourceContext<FlinkDeployment> ctx) { PodList jmPods = ctx.getFlinkService().getJmPodList(ctx.getResource(), ctx.getObserveConfig()); for (Pod pod : jmPods.getItems()) { - for (ContainerStatus cs : pod.getStatus().getContainerStatuses()) { - ContainerStateWaiting csw = cs.getState().getWaiting(); - if (csw != null - && Set.of( - DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF, - DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF, - DeploymentFailedException.REASON_ERR_IMAGE_PULL) - .contains(csw.getReason())) { - throw new DeploymentFailedException(csw); - } - } + var podStatus = pod.getStatus(); + Stream.concat( + podStatus.getContainerStatuses().stream(), + podStatus.getInitContainerStatuses().stream()) + .forEach(AbstractFlinkDeploymentObserver::checkContainerError); + + // No obvious errors were found, check for volume mount issues + EventUtils.checkForVolumeMountErrors(ctx.getKubernetesClient(), pod); + } + } + + private static void checkContainerError(ContainerStatus cs) { + if (cs.getState() == null || cs.getState().getWaiting() == null) { + return; + } + if (DeploymentFailedException.CONTAINER_ERROR_REASONS.contains( + cs.getState().getWaiting().getReason())) { + throw DeploymentFailedException.forContainerStatus(cs); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java index b515cdda..fb0dbfc6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java @@ -17,11 +17,16 @@ package org.apache.flink.kubernetes.operator.utils; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; + import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.api.model.EventBuilder; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; -import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder; +import io.fabric8.kubernetes.api.model.ObjectReference; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodCondition; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import org.slf4j.Logger; @@ -32,10 +37,14 @@ import javax.annotation.Nullable; import java.net.HttpURLConnection; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; /** * The util to generate an event for the target resource. It is copied from @@ -182,13 +191,7 @@ public class EventUtils { String eventName) { return new EventBuilder() .withApiVersion("v1") - .withInvolvedObject( - new ObjectReferenceBuilder() - .withKind(target.getKind()) - .withUid(target.getMetadata().getUid()) - .withName(target.getMetadata().getName()) - .withNamespace(target.getMetadata().getNamespace()) - .build()) + .withInvolvedObject(getObjectReference(target)) .withType(type.name()) .withReason(reason) .withFirstTimestamp(Instant.now().toString()) @@ -235,4 +238,78 @@ public class EventUtils { } return Optional.empty(); } + + private static List<Event> getPodEvents(KubernetesClient client, Pod pod) { + var ref = getObjectReference(pod); + + var eventList = + client.v1() + .events() + .inNamespace(pod.getMetadata().getNamespace()) + .withInvolvedObject(ref) + .list(); + + if (eventList == null) { + return new ArrayList<>(); + } + + var items = eventList.getItems(); + if (items == null) { + return new ArrayList<>(); + } + return items; + } + + @VisibleForTesting + protected static ObjectReference getObjectReference(HasMetadata resource) { + var ref = new ObjectReference(); + ref.setApiVersion(resource.getApiVersion()); + ref.setKind(resource.getKind()); + ref.setName(resource.getMetadata().getName()); + ref.setNamespace(resource.getMetadata().getNamespace()); + ref.setUid(resource.getMetadata().getUid()); + return ref; + } + + /** + * Check that pod is stuck during volume mount stage and throw {@link DeploymentFailedException} + * with the right reason message if that's the case. + * + * @param client Kubernetes client + * @param pod Pod to be checked + */ + public static void checkForVolumeMountErrors(KubernetesClient client, Pod pod) { + var conditions = pod.getStatus().getConditions(); + if (conditions == null) { + return; + } + var conditionMap = + conditions.stream() + .collect(Collectors.toMap(PodCondition::getType, Function.identity())); + + // We use PodReadyToStartContainers if available otherwise use Initialized, but it's only + // there k8s 1.29+ + boolean failedInitialization = + checkStatusWasAlways( + pod, + conditionMap.getOrDefault( + "PodReadyToStartContainers", conditionMap.get("Initialized")), + "False"); + + boolean notReady = checkStatusWasAlways(pod, conditionMap.get("Ready"), "False"); + + if (notReady && failedInitialization) { + getPodEvents(client, pod).stream() + .filter(e -> e.getReason().equals("FailedMount")) + .findAny() + .ifPresent( + e -> { + throw new DeploymentFailedException(e.getMessage(), e.getReason()); + }); + } + } + + private static boolean checkStatusWasAlways(Pod pod, PodCondition condition, String status) { + return condition != null && condition.getStatus().equals(status); + } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java index d9492bfd..e3a3ae25 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java @@ -97,6 +97,24 @@ public class TestUtils extends BaseTestUtils { public static PodList createFailedPodList(String crashLoopMessage, String reason) { ContainerStatus cs = new ContainerStatusBuilder() + .withName("c1") + .withNewState() + .withNewWaiting() + .withReason(reason) + .withMessage(crashLoopMessage) + .endWaiting() + .endState() + .build(); + + Pod pod = getTestPod("host", "apiVersion", Collections.emptyList()); + pod.setStatus(new PodStatusBuilder().withContainerStatuses(cs).build()); + return new PodListBuilder().withItems(pod).build(); + } + + public static PodList createFailedInitContainerPodList(String crashLoopMessage, String reason) { + ContainerStatus cs = + new ContainerStatusBuilder() + .withName("c1") .withNewState() .withNewWaiting() .withReason(reason) @@ -108,7 +126,8 @@ public class TestUtils extends BaseTestUtils { Pod pod = getTestPod("host", "apiVersion", Collections.emptyList()); pod.setStatus( new PodStatusBuilder() - .withContainerStatuses(Collections.singletonList(cs)) + .withContainerStatuses(new ContainerStatusBuilder().withReady().build()) + .withInitContainerStatuses(cs) .build()); return new PodListBuilder().withItems(pod).build(); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 8c3bd33f..2384fb4d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -35,7 +35,6 @@ import org.apache.flink.kubernetes.operator.api.status.JobStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus; import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; -import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -59,7 +58,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Map; @@ -299,94 +297,6 @@ public class FlinkDeploymentControllerTest { updateControl.getScheduleDelay().get()); } - @ParameterizedTest() - @ValueSource( - strings = { - DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF, - DeploymentFailedException.REASON_IMAGE_PULL_BACKOFF, - DeploymentFailedException.REASON_ERR_IMAGE_PULL - }) - public void verifyInProgressDeploymentWithError(String reason) throws Exception { - String crashLoopMessage = "container fails"; - - var submittedEventValidatingResponseProvider = - new TestUtils.ValidatingResponseProvider<>( - mockedEvent, - r -> - assertTrue( - r.getBody() - .readUtf8() - .contains( - AbstractFlinkResourceReconciler - .MSG_SUBMIT))); - mockServer - .expect() - .post() - .withPath("/api/v1/namespaces/flink-operator-test/events") - .andReply(submittedEventValidatingResponseProvider) - .once(); - - var validatingResponseProvider = - new TestUtils.ValidatingResponseProvider<>( - mockedEvent, - r -> { - String recordedRequestBody = r.getBody().readUtf8(); - assertTrue(recordedRequestBody.contains(reason)); - assertTrue(recordedRequestBody.contains(crashLoopMessage)); - }); - mockServer - .expect() - .post() - .withPath("/api/v1/namespaces/flink-operator-test/events") - .andReply(validatingResponseProvider) - .once(); - - flinkService.setPodList(TestUtils.createFailedPodList(crashLoopMessage, reason)); - - FlinkDeployment appCluster = TestUtils.buildApplicationCluster(); - UpdateControl<FlinkDeployment> updateControl; - - testController.reconcile(appCluster, context); - updateControl = - testController.reconcile( - appCluster, - TestUtils.createContextWithInProgressDeployment(kubernetesClient)); - submittedEventValidatingResponseProvider.assertValidated(); - assertFalse(updateControl.isUpdateStatus()); - assertEquals( - Optional.of( - configManager.getOperatorConfiguration().getReconcileInterval().toMillis()), - updateControl.getScheduleDelay()); - - assertEquals( - JobManagerDeploymentStatus.ERROR, - appCluster.getStatus().getJobManagerDeploymentStatus()); - assertEquals( - org.apache.flink.api.common.JobStatus.RECONCILING, - appCluster.getStatus().getJobStatus().getState()); - - // Validate status status - assertNotNull(appCluster.getStatus().getError()); - - // next cycle should not create another event - updateControl = - testController.reconcile( - appCluster, - TestUtils.createContextWithFailedJobManagerDeployment(kubernetesClient)); - assertEquals( - JobManagerDeploymentStatus.ERROR, - appCluster.getStatus().getJobManagerDeploymentStatus()); - assertFalse(updateControl.isUpdateStatus()); - assertEquals( - ReconciliationUtils.rescheduleAfter( - JobManagerDeploymentStatus.READY, - appCluster, - configManager.getOperatorConfiguration()) - .toMillis(), - updateControl.getScheduleDelay().get()); - validatingResponseProvider.assertValidated(); - } - @ParameterizedTest @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") public void verifyUpgradeFromSavepointLegacyMode(FlinkVersion flinkVersion) throws Exception { @@ -915,9 +825,7 @@ public class FlinkDeploymentControllerTest { @Test public void testSuccessfulObservationShouldClearErrors() throws Exception { final String crashLoopMessage = "deploy errors"; - flinkService.setPodList( - TestUtils.createFailedPodList( - crashLoopMessage, DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)); + flinkService.setPodList(TestUtils.createFailedPodList(crashLoopMessage, "ErrImagePull")); FlinkDeployment appCluster = TestUtils.buildApplicationCluster(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java new file mode 100644 index 00000000..2437d3b9 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/exception/DeploymentFailedExceptionTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.exception; + +import io.fabric8.kubernetes.api.model.ContainerState; +import io.fabric8.kubernetes.api.model.ContainerStateTerminated; +import io.fabric8.kubernetes.api.model.ContainerStateWaiting; +import io.fabric8.kubernetes.api.model.ContainerStatus; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for {@link DeploymentFailedException}. */ +public class DeploymentFailedExceptionTest { + + @Test + public void testErrorFromContainerStatus() { + var containerStatus = new ContainerStatus(); + containerStatus.setName("c1"); + var state = new ContainerState(); + var waiting = new ContainerStateWaiting(); + waiting.setMessage("msg"); + waiting.setReason("r"); + state.setWaiting(waiting); + + containerStatus.setState(state); + + var ex = DeploymentFailedException.forContainerStatus(containerStatus); + assertEquals("[c1] msg", ex.getMessage()); + assertEquals("r", ex.getReason()); + + waiting.setReason("CrashLoopBackOff"); + waiting.setMessage("backing off"); + ex = DeploymentFailedException.forContainerStatus(containerStatus); + assertEquals("[c1] backing off", ex.getMessage()); + assertEquals("CrashLoopBackOff", ex.getReason()); + + // Last state set but not terminated + var lastState = new ContainerState(); + containerStatus.setLastState(lastState); + + ex = DeploymentFailedException.forContainerStatus(containerStatus); + assertEquals("[c1] backing off", ex.getMessage()); + assertEquals("CrashLoopBackOff", ex.getReason()); + + var terminated = new ContainerStateTerminated(); + terminated.setMessage("crash"); + lastState.setTerminated(terminated); + + ex = DeploymentFailedException.forContainerStatus(containerStatus); + assertEquals("[c1] CrashLoop - crash", ex.getMessage()); + assertEquals("CrashLoopBackOff", ex.getReason()); + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java index aa4898d8..68233d39 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java @@ -48,12 +48,16 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import lombok.Getter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo; import static org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus; @@ -753,8 +757,9 @@ public class ApplicationObserverTest extends OperatorTestBase { deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY); } - @Test - public void observeListJobsError() { + @ParameterizedTest + @MethodSource("containerFailureReasons") + public void observeListJobsError(String reason, boolean initError) { bringToReadyStatus(deployment); observer.observe(deployment, readyContext); assertEquals( @@ -762,9 +767,12 @@ public class ApplicationObserverTest extends OperatorTestBase { deployment.getStatus().getJobManagerDeploymentStatus()); // simulate deployment failure String podFailedMessage = "list jobs error"; - flinkService.setPodList( - TestUtils.createFailedPodList( - podFailedMessage, DeploymentFailedException.REASON_CRASH_LOOP_BACKOFF)); + if (initError) { + flinkService.setPodList( + TestUtils.createFailedInitContainerPodList(podFailedMessage, reason)); + } else { + flinkService.setPodList(TestUtils.createFailedPodList(podFailedMessage, reason)); + } flinkService.setPortReady(false); Exception exception = assertThrows( @@ -774,7 +782,14 @@ public class ApplicationObserverTest extends OperatorTestBase { deployment, TestUtils.createContextWithInProgressDeployment( kubernetesClient))); - assertEquals(podFailedMessage, exception.getMessage()); + assertEquals("[c1] " + podFailedMessage, exception.getMessage()); + } + + private static Stream<Arguments> containerFailureReasons() { + return DeploymentFailedException.CONTAINER_ERROR_REASONS.stream() + .flatMap( + reason -> + Stream.of(Arguments.of(reason, true), Arguments.of(reason, false))); } @Test diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java new file mode 100644 index 00000000..d4e57809 --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/PodErrorTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException; + +import io.fabric8.kubeapitest.junit.EnableKubeAPIServer; +import io.fabric8.kubernetes.api.model.EventBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodCondition; +import io.fabric8.kubernetes.api.model.PodConditionBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +/** Test for {@link EventUtils}. */ +@EnableKubeAPIServer +public class PodErrorTest { + + static KubernetesClient client; + + @Test + public void testVolumeMountErrors() { + var pod = + new PodBuilder() + .withNewMetadata() + .withName("test") + .withNamespace("default") + .endMetadata() + .withNewStatus() + .endStatus() + .build(); + + // No conditions, no error expected + EventUtils.checkForVolumeMountErrors(client, pod); + + var conditions = new ArrayList<PodCondition>(); + pod.getStatus().setConditions(conditions); + + // No conditions, no error expected + EventUtils.checkForVolumeMountErrors(client, pod); + + // Create error events + createPodEvent("e1", "reason1", "msg1", pod); + createPodEvent("e2", "FailedMount", "mountErr", pod); + + var conditionMap = new HashMap<String, String>(); + + // Pod initialized completely, shouldn't check events + conditionMap.put("Initialized", "True"); + conditionMap.put("Ready", "False"); + + conditions.clear(); + conditionMap.forEach( + (t, s) -> + conditions.add( + new PodConditionBuilder().withType(t).withStatus(s).build())); + EventUtils.checkForVolumeMountErrors(client, pod); + + // Pod initialized completely, shouldn't check events + conditionMap.put("PodReadyToStartContainers", "True"); + conditionMap.put("Initialized", "False"); + + conditions.clear(); + conditionMap.forEach( + (t, s) -> + conditions.add( + new PodConditionBuilder().withType(t).withStatus(s).build())); + EventUtils.checkForVolumeMountErrors(client, pod); + + // Check event only when not ready to start + conditionMap.put("PodReadyToStartContainers", "False"); + conditions.clear(); + conditionMap.forEach( + (t, s) -> + conditions.add( + new PodConditionBuilder().withType(t).withStatus(s).build())); + + try { + EventUtils.checkForVolumeMountErrors(client, pod); + fail("Exception not thrown"); + } catch (DeploymentFailedException dfe) { + assertEquals("FailedMount", dfe.getReason()); + assertEquals("mountErr", dfe.getMessage()); + } + + // Old kubernetes without PodReadyToStartContainers + conditionMap.remove("PodReadyToStartContainers"); + conditionMap.put("Initialized", "False"); + conditions.clear(); + conditionMap.forEach( + (t, s) -> + conditions.add( + new PodConditionBuilder().withType(t).withStatus(s).build())); + + try { + EventUtils.checkForVolumeMountErrors(client, pod); + fail("Exception not thrown"); + } catch (DeploymentFailedException dfe) { + assertEquals("FailedMount", dfe.getReason()); + assertEquals("mountErr", dfe.getMessage()); + } + } + + private void createPodEvent(String name, String reason, String msg, Pod pod) { + var event = + new EventBuilder() + .withApiVersion("v1") + .withInvolvedObject(EventUtils.getObjectReference(pod)) + .withType("type") + .withReason(reason) + .withFirstTimestamp(Instant.now().toString()) + .withLastTimestamp(Instant.now().toString()) + .withNewSource() + .withComponent("pod") + .endSource() + .withCount(1) + .withMessage(msg) + .withNewMetadata() + .withName(name) + .withNamespace(pod.getMetadata().getNamespace()) + .endMetadata() + .build(); + client.resource(event).create(); + } +}