This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new cda493e5 [FLINK-37455] Create error Event when job goes into FAILED state cda493e5 is described below commit cda493e52e97c09886a466cb9ecad8b679bedcb0 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Fri Mar 28 08:46:14 2025 +0100 [FLINK-37455] Create error Event when job goes into FAILED state --- .../controller/FlinkDeploymentController.java | 45 ++++++---------------- .../controller/FlinkSessionJobController.java | 18 +++++---- .../operator/observer/JobStatusObserver.java | 18 ++++++--- .../kubernetes/operator/utils/EventRecorder.java | 3 +- .../kubernetes/operator/utils/ExceptionUtils.java | 9 +++-- .../kubernetes/operator/TestingFlinkService.java | 22 +++++++++++ .../controller/FlinkDeploymentControllerTest.java | 6 +-- .../controller/FlinkSessionJobControllerTest.java | 4 +- .../operator/observer/JobStatusObserverTest.java | 33 ++++++++++++++++ .../operator/utils/ExceptionUtilsTest.java | 21 ++++++++-- 10 files changed, 119 insertions(+), 60 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java index bae03cba..51235b8e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java @@ -155,17 +155,15 @@ public class FlinkDeploymentController statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient()); reconcilerFactory.getOrCreate(flinkApp).reconcile(ctx); } catch (UpgradeFailureException ufe) { - handleUpgradeFailure(ctx, ufe); + ReconciliationUtils.updateForReconciliationError(ctx, ufe); + triggerErrorEvent(ctx, ufe, ufe.getReason()); } catch (DeploymentFailedException dfe) { - handleDeploymentFailed(ctx, dfe); + flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); + flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING); + ReconciliationUtils.updateForReconciliationError(ctx, dfe); + triggerErrorEvent(ctx, dfe, dfe.getReason()); } catch (Exception e) { - eventRecorder.triggerEvent( - flinkApp, - EventRecorder.Type.Warning, - "ClusterDeploymentException", - ExceptionUtils.getExceptionMessage(e), - EventRecorder.Component.JobManagerDeployment, - josdkContext.getClient()); + triggerErrorEvent(ctx, e, EventRecorder.Reason.Error.name()); throw new ReconciliationException(e); } @@ -175,32 +173,13 @@ public class FlinkDeploymentController ctx.getOperatorConfig(), flinkApp, previousDeployment, true); } - private void handleDeploymentFailed( - FlinkResourceContext<FlinkDeployment> ctx, DeploymentFailedException dfe) { - var flinkApp = ctx.getResource(); - LOG.error("Flink Deployment failed", dfe); - flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); - flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING); - ReconciliationUtils.updateForReconciliationError(ctx, dfe); + private void triggerErrorEvent( + FlinkResourceContext<FlinkDeployment> ctx, Exception e, String reason) { eventRecorder.triggerEvent( - flinkApp, - EventRecorder.Type.Warning, - dfe.getReason(), - dfe.getMessage(), - EventRecorder.Component.JobManagerDeployment, - ctx.getKubernetesClient()); - } - - private void handleUpgradeFailure( - FlinkResourceContext<FlinkDeployment> ctx, UpgradeFailureException ufe) { - LOG.error("Error while upgrading Flink Deployment", ufe); - var flinkApp = ctx.getResource(); - ReconciliationUtils.updateForReconciliationError(ctx, ufe); - eventRecorder.triggerEvent( - flinkApp, + ctx.getResource(), EventRecorder.Type.Warning, - ufe.getReason(), - ufe.getMessage(), + reason, + ExceptionUtils.getExceptionMessage(e), EventRecorder.Component.JobManagerDeployment, ctx.getKubernetesClient()); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java index 4838dea8..7454864f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java @@ -121,13 +121,7 @@ public class FlinkSessionJobController statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient()); reconciler.reconcile(ctx); } catch (Exception e) { - eventRecorder.triggerEvent( - flinkSessionJob, - EventRecorder.Type.Warning, - "SessionJobException", - ExceptionUtils.getExceptionMessage(e), - EventRecorder.Component.Job, - josdkContext.getClient()); + triggerErrorEvent(ctx, e); throw new ReconciliationException(e); } statusRecorder.patchAndCacheStatus(flinkSessionJob, ctx.getKubernetesClient()); @@ -167,6 +161,16 @@ public class FlinkSessionJobController return deleteControl; } + private void triggerErrorEvent(FlinkResourceContext<?> ctx, Exception e) { + eventRecorder.triggerEvent( + ctx.getResource(), + EventRecorder.Type.Warning, + EventRecorder.Reason.Error.name(), + ExceptionUtils.getExceptionMessage(e), + EventRecorder.Component.Job, + ctx.getKubernetesClient()); + } + @Override public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus( FlinkSessionJob sessionJob, Context<FlinkSessionJob> context, Exception e) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java index c94c1231..444d0a0a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java @@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.kubernetes.operator.utils.ExceptionUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.slf4j.Logger; @@ -182,7 +183,7 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> { markSuspended(resource); } - setErrorIfPresent(ctx, clusterJobStatus); + recordJobErrorIfPresent(ctx, clusterJobStatus); eventRecorder.triggerEvent( resource, EventRecorder.Type.Normal, @@ -203,7 +204,8 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> { }); } - private void setErrorIfPresent(FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) { + private void recordJobErrorIfPresent( + FlinkResourceContext<R> ctx, JobStatusMessage clusterJobStatus) { if (clusterJobStatus.getJobState() == JobStatus.FAILED) { try { var result = @@ -215,10 +217,14 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> { t -> { updateFlinkResourceException( t, ctx.getResource(), ctx.getOperatorConfig()); - LOG.error( - "Job {} failed with error: {}", - clusterJobStatus.getJobId(), - t.getFullStringifiedStackTrace()); + + eventRecorder.triggerEvent( + ctx.getResource(), + EventRecorder.Type.Warning, + EventRecorder.Reason.Error, + EventRecorder.Component.Job, + ExceptionUtils.getExceptionMessage(t), + ctx.getKubernetesClient()); }); } catch (Exception e) { LOG.warn("Failed to request the job result", e); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index 4af3f190..1989de0e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -314,6 +314,7 @@ public class EventRecorder { Scaling, UnsupportedFlinkVersion, SnapshotError, - SnapshotAbandoned + SnapshotAbandoned, + Error } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java index ad7bd6be..002c43b7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtils.java @@ -75,7 +75,7 @@ public class ExceptionUtils { * → cause3" */ public static String getExceptionMessage(Throwable throwable) { - return getExceptionMessage(throwable, 0); + return getExceptionMessage(throwable, 1); } /** @@ -93,11 +93,12 @@ public class ExceptionUtils { } if (throwable instanceof SerializedThrowable) { + var serialized = ((SerializedThrowable) throwable); var deserialized = - ((SerializedThrowable) throwable) - .deserializeError(Thread.currentThread().getContextClassLoader()); + serialized.deserializeError(Thread.currentThread().getContextClassLoader()); if (deserialized == throwable) { - return "Unknown Error (SerializedThrowable)"; + var msg = serialized.getMessage(); + return msg != null ? msg : serialized.getOriginalErrorClassName(); } else { return getExceptionMessage(deserialized, level); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index 3dfc34a9..fa893e7c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -52,6 +52,7 @@ import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper; import org.apache.flink.kubernetes.operator.service.SuspendMode; import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -141,6 +142,7 @@ public class TestingFlinkService extends AbstractFlinkService { @Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>(); private final Map<Long, String> checkpointStats = new HashMap<>(); @Setter private boolean throwCheckpointingDisabledError = false; + @Setter private Throwable jobFailedErr; @Getter private int desiredReplicas = 0; @Getter private int cancelJobCallCount = 0; @@ -301,9 +303,29 @@ public class TestingFlinkService extends AbstractFlinkService { if (!isPortReady) { throw new TimeoutException("JM port is unavailable"); } + + if (jobFailedErr != null) { + return Optional.of(new JobStatusMessage(jobID, "n", JobStatus.FAILED, 0)); + } + return super.getJobStatus(conf, jobID); } + @Override + public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception { + if (jobFailedErr != null) { + return new JobResult.Builder() + .jobId(jobID) + .serializedThrowable(new SerializedThrowable(jobFailedErr)) + .netRuntime(1) + .accumulatorResults(new HashMap<>()) + .applicationStatus(ApplicationStatus.FAILED) + .build(); + } + + return super.requestJobResult(conf, jobID); + } + public List<Tuple3<String, JobStatusMessage, Configuration>> listJobs() { return jobs; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 18273c63..8c3bd33f 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -981,7 +981,7 @@ public class FlinkDeploymentControllerTest { var event = testController.flinkResourceEvents().remove(); assertEquals("Submit", event.getReason()); event = testController.flinkResourceEvents().remove(); - assertEquals("ClusterDeploymentException", event.getReason()); + assertEquals("Error", event.getReason()); assertEquals("Deployment failure", event.getMessage()); } @@ -1006,7 +1006,7 @@ public class FlinkDeploymentControllerTest { var event = testController.flinkResourceEvents().remove(); assertEquals("Submit", event.getReason()); event = testController.flinkResourceEvents().remove(); - assertEquals("ClusterDeploymentException", event.getReason()); + assertEquals("Error", event.getReason()); assertEquals( "Deployment Failure -> IllegalStateException -> actual failure reason", event.getMessage()); @@ -1112,7 +1112,7 @@ public class FlinkDeploymentControllerTest { var event = testController.flinkResourceEvents().remove(); assertEquals("Submit", event.getReason()); event = testController.flinkResourceEvents().remove(); - assertEquals("ClusterDeploymentException", event.getReason()); + assertEquals("Error", event.getReason()); assertEquals( "Deployment Failure -> IllegalStateException -> actual failure reason", event.getMessage()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java index b7ad6f13..f0489cd2 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java @@ -103,7 +103,7 @@ class FlinkSessionJobControllerTest { var event = testController.events().remove(); Assertions.assertEquals(EventRecorder.Type.Warning.toString(), event.getType()); - Assertions.assertEquals("SessionJobException", event.getReason()); + Assertions.assertEquals("Error", event.getReason()); testController.cleanup(sessionJob, context); } @@ -635,7 +635,7 @@ class FlinkSessionJobControllerTest { var event = testController.events().remove(); assertEquals("Submit", event.getReason()); event = testController.events().remove(); - assertEquals("SessionJobException", event.getReason()); + assertEquals("Error", event.getReason()); assertEquals( "Deployment Failure -> IllegalStateException -> actual failure reason", event.getMessage()); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java index 512e9f7c..aee49e3c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java @@ -29,10 +29,12 @@ import org.apache.flink.kubernetes.operator.api.spec.JobState; import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.utils.EventRecorder; +import org.apache.flink.util.SerializedThrowable; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import lombok.Getter; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; @@ -42,6 +44,7 @@ import java.util.ArrayList; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for the {@link JobStatusObserver}. */ @EnableKubernetesMockClient(crud = true) @@ -114,6 +117,36 @@ public class JobStatusObserverTest extends OperatorTestBase { .getState()); } + @Test + void testFailed() throws Exception { + var observer = new JobStatusObserver<>(eventRecorder); + var deployment = initDeployment(); + var status = deployment.getStatus(); + var jobStatus = status.getJobStatus(); + jobStatus.setState(JobStatus.RUNNING); + FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment); + flinkService.submitApplicationCluster( + deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false); + + // Mark failed + flinkService.setJobFailedErr( + new Exception("job err", new SerializedThrowable(new Exception("root")))); + observer.observe(ctx); + + // First event should be job error reported + var jobErrorEvent = flinkResourceEventCollector.events.poll(); + assertEquals(EventRecorder.Reason.Error.name(), jobErrorEvent.getReason()); + assertEquals("job err -> root", jobErrorEvent.getMessage()); + + // Make sure job status still reported + assertEquals( + EventRecorder.Reason.JobStatusChanged.name(), + flinkResourceEventCollector.events.poll().getReason()); + + observer.observe(ctx); + assertTrue(flinkResourceEventCollector.events.isEmpty()); + } + private static Stream<Arguments> cancellingArgs() { var args = new ArrayList<Arguments>(); for (var status : JobStatus.values()) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java index 1bd8873d..144ec318 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ExceptionUtilsTest.java @@ -47,17 +47,30 @@ public class ExceptionUtilsTest { var ex2 = new RuntimeException("Cause 2", new SerializedThrowable(ex3)); var ex = new RuntimeException("Cause 1", ex2); assertThat(ExceptionUtils.getExceptionMessage(ex)) - .isEqualTo("Cause 1 -> Cause 2 -> Cause 3 -> Cause 4"); + .isEqualTo("Cause 1 -> Cause 2 -> Cause 3"); } @Test void testSerializedThrowableError() { - var serializedException = new SerializedThrowable(new NonSerializableException()); - assertThat(ExceptionUtils.getExceptionMessage(serializedException)) - .isEqualTo("Unknown Error (SerializedThrowable)"); + assertThat( + ExceptionUtils.getExceptionMessage( + new SerializedThrowable(new NonSerializableException("Message")))) + .isEqualTo(String.format("%s: Message", NonSerializableException.class.getName())); + + assertThat( + ExceptionUtils.getExceptionMessage( + new SerializedThrowable(new NonSerializableException()))) + .isEqualTo(NonSerializableException.class.getName()); } private static class NonSerializableException extends Exception { + + public NonSerializableException(String message) { + super(message); + } + + public NonSerializableException() {} + private void writeObject(java.io.ObjectOutputStream stream) throws IOException { throw new IOException(); }