This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new d5f47536 [FLINK-32033] Fix Lifecycle Status of FlinkDeployment Resource in case of MISSING/ERROR JM status d5f47536 is described below commit d5f47536034c14a3c12213312f3ad3d9c69e0ec0 Author: nishita-09 <74376090+nishita...@users.noreply.github.com> AuthorDate: Mon Jul 28 13:57:55 2025 +0530 [FLINK-32033] Fix Lifecycle Status of FlinkDeployment Resource in case of MISSING/ERROR JM status --- .../operator/api/status/CommonStatus.java | 23 ++++++++++ .../deployment/AbstractJobReconciler.java | 8 +++- .../deployment/ApplicationReconciler.java | 11 +++-- .../operator/service/AbstractFlinkService.java | 10 ++-- .../kubernetes/operator/TestingFlinkService.java | 11 +++-- .../controller/DeploymentRecoveryTest.java | 2 +- .../controller/FlinkDeploymentControllerTest.java | 4 +- .../lifecycle/ResourceLifecycleMetricsTest.java | 53 ++++++++++++++++++++++ 8 files changed, 108 insertions(+), 14 deletions(-) diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java index 34c8cc99..30649e89 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java @@ -37,6 +37,12 @@ import org.apache.commons.lang3.StringUtils; @SuperBuilder public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> { + // Frequent error message constants for resource failure reporting + public static final String MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED = + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted."; + public static final String MSG_HA_METADATA_NOT_AVAILABLE = "HA metadata is not available"; + public static final String MSG_MANUAL_RESTORE_REQUIRED = "Manual restore required."; + /** Last observed status of the Flink job on Application/Session cluster. */ private JobStatus jobStatus = new JobStatus(); @@ -90,6 +96,23 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> { return ResourceLifecycleState.FAILED; } + // Check for unrecoverable deployments that should be marked as FAILED if the error contains + // the following substrings + if (this instanceof FlinkDeploymentStatus) { + FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus) this; + var jmDeployStatus = deploymentStatus.getJobManagerDeploymentStatus(); + + // ERROR/MISSING deployments are in terminal error state and should always be FAILED + if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING + || jmDeployStatus == JobManagerDeploymentStatus.ERROR) + && error != null + && (error.contains(MSG_MANUAL_RESTORE_REQUIRED) + || error.contains(MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED) + || error.contains(MSG_HA_METADATA_NOT_AVAILABLE))) { + return ResourceLifecycleState.FAILED; + } + } + if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) { return ResourceLifecycleState.ROLLED_BACK; } else if (reconciliationStatus.isLastReconciledSpecStable()) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 9e79982c..3fcf8e46 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -59,6 +59,8 @@ import java.time.Instant; import java.util.Optional; import java.util.function.Predicate; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE; import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT; @@ -227,7 +229,8 @@ public abstract class AbstractJobReconciler< if (!SnapshotUtils.lastSavepointKnown(status)) { throw new UpgradeFailureException( - "Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. Manual restore required.", + "Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. " + + MSG_MANUAL_RESTORE_REQUIRED, "UpgradeFailed"); } LOG.info("Job is in terminal state, ready for upgrade from observed latest state"); @@ -360,7 +363,8 @@ public abstract class AbstractJobReconciler< var conf = ctx.getObserveConfig(); if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) { - LOG.info("HA metadata not available, cancel will be used instead of last-state"); + LOG.info( + "{}, cancel will be used instead of last-state", MSG_HA_METADATA_NOT_AVAILABLE); return true; } return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 7e022dbf..199a5030 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -59,6 +59,9 @@ import java.time.Instant; import java.util.Optional; import java.util.UUID; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED; /** Reconciler Flink Application deployments. */ @@ -114,9 +117,11 @@ public class ApplicationReconciler || jmDeployStatus == JobManagerDeploymentStatus.ERROR) && !flinkService.isHaMetadataAvailable(deployConfig)) { throw new UpgradeFailureException( - "JobManager deployment is missing and HA data is not available to make stateful upgrades. " - + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. " - + "Manual restore required.", + "JobManager deployment is missing and " + + MSG_HA_METADATA_NOT_AVAILABLE + + " to make stateful upgrades. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED + + MSG_MANUAL_RESTORE_REQUIRED, "UpgradeFailed"); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 211f2e9d..820fbced 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -155,6 +155,9 @@ import java.util.jar.JarOutputStream; import java.util.jar.Manifest; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX; import static org.apache.flink.util.ExceptionUtils.findThrowable; @@ -567,7 +570,7 @@ public abstract class AbstractFlinkService implements FlinkService { .getExternalPointer() .equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) { throw new UpgradeFailureException( - "Latest checkpoint not externally addressable, manual recovery required.", + "Latest checkpoint not externally addressable, " + MSG_MANUAL_RESTORE_REQUIRED, "CheckpointNotFound"); } return latestCheckpointOpt.map( @@ -1022,8 +1025,9 @@ public abstract class AbstractFlinkService implements FlinkService { private void validateHaMetadataExists(Configuration conf) { if (!isHaMetadataAvailable(conf)) { throw new UpgradeFailureException( - "HA metadata not available to restore from last state. " - + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. ", + MSG_HA_METADATA_NOT_AVAILABLE + + " to restore from last state." + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED, "RestoreFailed"); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index bee769fb..c020c53d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -105,6 +105,10 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; + /** Flink service mock for tests. */ public class TestingFlinkService extends AbstractFlinkService { @@ -244,9 +248,10 @@ public class TestingFlinkService extends AbstractFlinkService { protected void validateHaMetadataExists(Configuration conf) { if (!isHaMetadataAvailable(conf)) { throw new UpgradeFailureException( - "HA metadata not available to restore from last state. " - + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. " - + "Manual restore required.", + MSG_HA_METADATA_NOT_AVAILABLE + + " to restore from last state. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED + + MSG_MANUAL_RESTORE_REQUIRED, "RestoreFailed"); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java index 76d03c9b..07d23a5d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java @@ -121,7 +121,7 @@ public class DeploymentRecoveryTest { .getStatus() .getError() .contains( - "JobManager deployment is missing and HA data is not available to make stateful upgrades.")); + "JobManager deployment is missing and HA metadata is not available")); } else { flinkService.setPortReady(true); testController.reconcile(appCluster, context); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java index 21ba67ff..d0544b10 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java @@ -1054,7 +1054,7 @@ public class FlinkDeploymentControllerTest { .flinkResourceEvents() .poll() .getMessage() - .contains("HA metadata not available to restore from last state.")); + .contains("HA metadata is not available to restore from last state.")); testController.flinkResourceEvents().clear(); testController.reconcile(appCluster, context); @@ -1068,7 +1068,7 @@ public class FlinkDeploymentControllerTest { .flinkResourceEvents() .poll() .getMessage() - .contains("HA metadata not available to restore from last state.")); + .contains("HA metadata is not available to restore from last state.")); flinkService.setHaDataAvailable(true); testController.flinkResourceEvents().clear(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java index a399d871..212b0da7 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java @@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.spec.JobState; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics; @@ -52,6 +53,9 @@ import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecyc import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.STABLE; import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.SUSPENDED; import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.UPGRADING; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -337,4 +341,53 @@ public class ResourceLifecycleMetricsTest { } return histos; } + + @Test + public void testUnrecoverableDeploymentLifecycleState() { + var application = TestUtils.buildApplicationCluster(); + + // Setup the deployment to simulate it has been deployed (so isBeforeFirstDeployment = + // false) + ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration()); + application.getStatus().getReconciliationStatus().markReconciledSpecAsStable(); + + application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); + application + .getStatus() + .setError( + "\"JobManager deployment is missing and " + + MSG_HA_METADATA_NOT_AVAILABLE + + " to make stateful upgrades. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED + + MSG_MANUAL_RESTORE_REQUIRED); + assertEquals( + FAILED, + application.getStatus().getLifecycleState(), + "ERROR deployment with `configmaps have been deleted` error should always be FAILED (terminal error state)"); + + application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + application + .getStatus() + .setError( + MSG_HA_METADATA_NOT_AVAILABLE + + " to restore from last state. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED); + assertEquals( + FAILED, + application.getStatus().getLifecycleState(), + "MISSING deployment with error should be FAILED"); + + application.getStatus().setError(null); + application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + // Reset to DEPLOYED state (not stable yet) to simulate ongoing deployment + application.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED); + application + .getStatus() + .getReconciliationStatus() + .setLastStableSpec(null); // Mark as not stable + assertEquals( + DEPLOYED, + application.getStatus().getLifecycleState(), + "MISSING deployment before stability should not be FAILED yet (still deploying)"); + } }