This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d5f47536 [FLINK-32033] Fix Lifecycle Status of FlinkDeployment 
Resource in case of MISSING/ERROR JM status
d5f47536 is described below

commit d5f47536034c14a3c12213312f3ad3d9c69e0ec0
Author: nishita-09 <74376090+nishita...@users.noreply.github.com>
AuthorDate: Mon Jul 28 13:57:55 2025 +0530

    [FLINK-32033] Fix Lifecycle Status of FlinkDeployment Resource in case of 
MISSING/ERROR JM status
---
 .../operator/api/status/CommonStatus.java          | 23 ++++++++++
 .../deployment/AbstractJobReconciler.java          |  8 +++-
 .../deployment/ApplicationReconciler.java          | 11 +++--
 .../operator/service/AbstractFlinkService.java     | 10 ++--
 .../kubernetes/operator/TestingFlinkService.java   | 11 +++--
 .../controller/DeploymentRecoveryTest.java         |  2 +-
 .../controller/FlinkDeploymentControllerTest.java  |  4 +-
 .../lifecycle/ResourceLifecycleMetricsTest.java    | 53 ++++++++++++++++++++++
 8 files changed, 108 insertions(+), 14 deletions(-)

diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 34c8cc99..30649e89 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -37,6 +37,12 @@ import org.apache.commons.lang3.StringUtils;
 @SuperBuilder
 public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
 
+    // Frequent error message constants for resource failure reporting
+    public static final String MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED =
+            "It is possible that the job has finished or terminally failed, or 
the configmaps have been deleted.";
+    public static final String MSG_HA_METADATA_NOT_AVAILABLE = "HA metadata is 
not available";
+    public static final String MSG_MANUAL_RESTORE_REQUIRED = "Manual restore 
required.";
+
     /** Last observed status of the Flink job on Application/Session cluster. 
*/
     private JobStatus jobStatus = new JobStatus();
 
@@ -90,6 +96,23 @@ public abstract class CommonStatus<SPEC extends 
AbstractFlinkSpec> {
             return ResourceLifecycleState.FAILED;
         }
 
+        // Check for unrecoverable deployments that should be marked as FAILED 
if the error contains
+        // the following substrings
+        if (this instanceof FlinkDeploymentStatus) {
+            FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus) 
this;
+            var jmDeployStatus = 
deploymentStatus.getJobManagerDeploymentStatus();
+
+            // ERROR/MISSING deployments are in terminal error state and 
should always be FAILED
+            if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING
+                            || jmDeployStatus == 
JobManagerDeploymentStatus.ERROR)
+                    && error != null
+                    && (error.contains(MSG_MANUAL_RESTORE_REQUIRED)
+                            || 
error.contains(MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED)
+                            || error.contains(MSG_HA_METADATA_NOT_AVAILABLE))) 
{
+                return ResourceLifecycleState.FAILED;
+            }
+        }
+
         if (reconciliationStatus.getState() == 
ReconciliationState.ROLLED_BACK) {
             return ResourceLifecycleState.ROLLED_BACK;
         } else if (reconciliationStatus.isLastReconciledSpecStable()) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 9e79982c..3fcf8e46 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -59,6 +59,8 @@ import java.time.Instant;
 import java.util.Optional;
 import java.util.function.Predicate;
 
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE;
 import static 
org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT;
@@ -227,7 +229,8 @@ public abstract class AbstractJobReconciler<
 
             if (!SnapshotUtils.lastSavepointKnown(status)) {
                 throw new UpgradeFailureException(
-                        "Job is in terminal state but last checkpoint is 
unknown, possibly due to an unrecoverable restore error. Manual restore 
required.",
+                        "Job is in terminal state but last checkpoint is 
unknown, possibly due to an unrecoverable restore error. "
+                                + MSG_MANUAL_RESTORE_REQUIRED,
                         "UpgradeFailed");
             }
             LOG.info("Job is in terminal state, ready for upgrade from 
observed latest state");
@@ -360,7 +363,8 @@ public abstract class AbstractJobReconciler<
 
         var conf = ctx.getObserveConfig();
         if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) {
-            LOG.info("HA metadata not available, cancel will be used instead 
of last-state");
+            LOG.info(
+                    "{}, cancel will be used instead of last-state", 
MSG_HA_METADATA_NOT_AVAILABLE);
             return true;
         }
         return 
conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 7e022dbf..199a5030 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -59,6 +59,9 @@ import java.time.Instant;
 import java.util.Optional;
 import java.util.UUID;
 
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
 
 /** Reconciler Flink Application deployments. */
@@ -114,9 +117,11 @@ public class ApplicationReconciler
                         || jmDeployStatus == JobManagerDeploymentStatus.ERROR)
                 && !flinkService.isHaMetadataAvailable(deployConfig)) {
             throw new UpgradeFailureException(
-                    "JobManager deployment is missing and HA data is not 
available to make stateful upgrades. "
-                            + "It is possible that the job has finished or 
terminally failed, or the configmaps have been deleted. "
-                            + "Manual restore required.",
+                    "JobManager deployment is missing and "
+                            + MSG_HA_METADATA_NOT_AVAILABLE
+                            + " to make stateful upgrades. "
+                            + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
+                            + MSG_MANUAL_RESTORE_REQUIRED,
                     "UpgradeFailed");
         }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 211f2e9d..820fbced 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -155,6 +155,9 @@ import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
@@ -567,7 +570,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                         .getExternalPointer()
                         
.equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) {
             throw new UpgradeFailureException(
-                    "Latest checkpoint not externally addressable, manual 
recovery required.",
+                    "Latest checkpoint not externally addressable, " + 
MSG_MANUAL_RESTORE_REQUIRED,
                     "CheckpointNotFound");
         }
         return latestCheckpointOpt.map(
@@ -1022,8 +1025,9 @@ public abstract class AbstractFlinkService implements 
FlinkService {
     private void validateHaMetadataExists(Configuration conf) {
         if (!isHaMetadataAvailable(conf)) {
             throw new UpgradeFailureException(
-                    "HA metadata not available to restore from last state. "
-                            + "It is possible that the job has finished or 
terminally failed, or the configmaps have been deleted. ",
+                    MSG_HA_METADATA_NOT_AVAILABLE
+                            + " to restore from last state."
+                            + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED,
                     "RestoreFailed");
         }
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index bee769fb..c020c53d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -105,6 +105,10 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
+
 /** Flink service mock for tests. */
 public class TestingFlinkService extends AbstractFlinkService {
 
@@ -244,9 +248,10 @@ public class TestingFlinkService extends 
AbstractFlinkService {
     protected void validateHaMetadataExists(Configuration conf) {
         if (!isHaMetadataAvailable(conf)) {
             throw new UpgradeFailureException(
-                    "HA metadata not available to restore from last state. "
-                            + "It is possible that the job has finished or 
terminally failed, or the configmaps have been deleted. "
-                            + "Manual restore required.",
+                    MSG_HA_METADATA_NOT_AVAILABLE
+                            + " to restore from last state. "
+                            + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
+                            + MSG_MANUAL_RESTORE_REQUIRED,
                     "RestoreFailed");
         }
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 76d03c9b..07d23a5d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -121,7 +121,7 @@ public class DeploymentRecoveryTest {
                             .getStatus()
                             .getError()
                             .contains(
-                                    "JobManager deployment is missing and HA 
data is not available to make stateful upgrades."));
+                                    "JobManager deployment is missing and HA 
metadata is not available"));
         } else {
             flinkService.setPortReady(true);
             testController.reconcile(appCluster, context);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 21ba67ff..d0544b10 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -1054,7 +1054,7 @@ public class FlinkDeploymentControllerTest {
                         .flinkResourceEvents()
                         .poll()
                         .getMessage()
-                        .contains("HA metadata not available to restore from 
last state."));
+                        .contains("HA metadata is not available to restore 
from last state."));
 
         testController.flinkResourceEvents().clear();
         testController.reconcile(appCluster, context);
@@ -1068,7 +1068,7 @@ public class FlinkDeploymentControllerTest {
                         .flinkResourceEvents()
                         .poll()
                         .getMessage()
-                        .contains("HA metadata not available to restore from 
last state."));
+                        .contains("HA metadata is not available to restore 
from last state."));
 
         flinkService.setHaDataAvailable(true);
         testController.flinkResourceEvents().clear();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index a399d871..212b0da7 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics;
@@ -52,6 +53,9 @@ import static 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecyc
 import static 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.STABLE;
 import static 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.SUSPENDED;
 import static 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.UPGRADING;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED;
+import static 
org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -337,4 +341,53 @@ public class ResourceLifecycleMetricsTest {
         }
         return histos;
     }
+
+    @Test
+    public void testUnrecoverableDeploymentLifecycleState() {
+        var application = TestUtils.buildApplicationCluster();
+
+        // Setup the deployment to simulate it has been deployed (so 
isBeforeFirstDeployment =
+        // false)
+        ReconciliationUtils.updateStatusForDeployedSpec(application, new 
Configuration());
+        
application.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
+
+        
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+        application
+                .getStatus()
+                .setError(
+                        "\"JobManager deployment is missing and  "
+                                + MSG_HA_METADATA_NOT_AVAILABLE
+                                + " to make stateful upgrades. "
+                                + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED
+                                + MSG_MANUAL_RESTORE_REQUIRED);
+        assertEquals(
+                FAILED,
+                application.getStatus().getLifecycleState(),
+                "ERROR deployment with `configmaps have been deleted` error 
should always be FAILED (terminal error state)");
+
+        
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        application
+                .getStatus()
+                .setError(
+                        MSG_HA_METADATA_NOT_AVAILABLE
+                                + " to restore from last state. "
+                                + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED);
+        assertEquals(
+                FAILED,
+                application.getStatus().getLifecycleState(),
+                "MISSING deployment with error should be FAILED");
+
+        application.getStatus().setError(null);
+        
application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        // Reset to DEPLOYED state (not stable yet) to simulate ongoing 
deployment
+        
application.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+        application
+                .getStatus()
+                .getReconciliationStatus()
+                .setLastStableSpec(null); // Mark as not stable
+        assertEquals(
+                DEPLOYED,
+                application.getStatus().getLifecycleState(),
+                "MISSING deployment before stability should not be FAILED yet 
(still deploying)");
+    }
 }

Reply via email to