This is an automated email from the ASF dual-hosted git repository.

davidrad pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 56af6bf3 [FLINK-33634] Add Conditions to Flink CRD's Status field 
(#1064)
56af6bf3 is described below

commit 56af6bf3a94494923497e709faf29fb3749cd45f
Author: David Radley <[email protected]>
AuthorDate: Mon Mar 2 15:17:23 2026 +0000

    [FLINK-33634] Add Conditions to Flink CRD's Status field (#1064)
    
    Added conditions to FlinkDeployment, FlinkSessionJob, and FlinkStateSnapshot
    CRD status fields to provide standardised Kubernetes-style status reporting.
    
    Signed-off-by: [email protected] <[email protected]>
    Co-authored-by: Lajith <[email protected]>
---
 .gitignore                                         |   4 +-
 docs/content/docs/custom-resource/reference.md     |   3 +
 .../flink/kubernetes/operator/api}/Mode.java       |   5 +-
 .../operator/api/status/FlinkDeploymentStatus.java |  45 +++
 .../api/status/JobManagerDeploymentStatus.java     |  28 +-
 .../operator/api/utils/ConditionsUtils.java        | 123 ++++++++
 .../FlinkDeploymentStatusConditionsTest.java       | 320 +++++++++++++++++++++
 .../operator/api/utils/BaseTestUtils.java          |  43 +++
 .../controller/FlinkDeploymentController.java      |   1 +
 .../observer/deployment/ApplicationObserver.java   |   2 +-
 .../deployment/FlinkDeploymentObserverFactory.java |   2 +-
 .../observer/deployment/SessionObserver.java       |   2 +-
 .../reconciler/deployment/ReconcilerFactory.java   |   2 +-
 .../operator/service/StandaloneFlinkService.java   |   2 +-
 .../controller/FlinkDeploymentControllerTest.java  |  25 +-
 .../service/StandaloneFlinkServiceTest.java        |   2 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  17 ++
 17 files changed, 607 insertions(+), 19 deletions(-)

diff --git a/.gitignore b/.gitignore
index 0fd15a8f..527bce74 100644
--- a/.gitignore
+++ b/.gitignore
@@ -40,4 +40,6 @@ buildNumber.properties
 .kube
 
 # VSCode settings
-.vscode/
\ No newline at end of file
+.vscode/
+.metals
+.bloop
diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index 35f6e9be..76b303ce 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -397,6 +397,7 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | jobManagerDeploymentStatus | 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus | 
Last observed status of the JobManager deployment. |
 | reconciliationStatus | 
org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus
 | Status of the last reconcile operation. |
 | taskManager | 
org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo | Information 
about the TaskManagers for the scale subresource. |
+| conditions | java.util.List<io.fabric8.kubernetes.api.model.Condition> | 
Condition of the CR . |
 
 ### FlinkSessionJobReconciliationStatus
 **Class**: 
org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus
@@ -476,6 +477,8 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | DEPLOYING | JobManager process is starting up. |
 | MISSING | JobManager deployment not found, probably not started or killed by 
user. |
 | ERROR | Deployment in terminal error, requires spec change for 
reconciliation to continue. |
+| reason | java.lang.String |  |
+| message | java.lang.String |  |
 
 ### JobStatus
 **Class**: org.apache.flink.kubernetes.operator.api.status.JobStatus
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java
similarity index 91%
rename from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
rename to 
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java
index adb37e7c..5f5392d5 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.config;
+package org.apache.flink.kubernetes.operator.api;
 
-import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 
 /** The mode of {@link FlinkDeployment}. */
@@ -45,7 +44,7 @@ public enum Mode {
                 : getMode(lastReconciledSpec);
     }
 
-    private static Mode getMode(FlinkDeploymentSpec spec) {
+    public static Mode getMode(FlinkDeploymentSpec spec) {
         return spec.getJob() != null ? APPLICATION : SESSION;
     }
 }
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
index 9a341299..afdb1c26 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
@@ -18,10 +18,13 @@
 package org.apache.flink.kubernetes.operator.api.status;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.Mode;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.utils.ConditionsUtils;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.kubernetes.api.model.Condition;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -29,7 +32,9 @@ import lombok.NoArgsConstructor;
 import lombok.ToString;
 import lombok.experimental.SuperBuilder;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Last observed status of the Flink deployment. */
@@ -43,6 +48,8 @@ import java.util.Map;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class FlinkDeploymentStatus extends CommonStatus<FlinkDeploymentSpec> {
 
+    public static final String CONDITION_TYPE_RUNNING = "Running";
+
     /** Information from running clusters. */
     private Map<String, String> clusterInfo = new HashMap<>();
 
@@ -57,9 +64,47 @@ public class FlinkDeploymentStatus extends 
CommonStatus<FlinkDeploymentSpec> {
     /** Information about the TaskManagers for the scale subresource. */
     private TaskManagerInfo taskManager;
 
+    /** Condition of the CR . */
+    private List<Condition> conditions = new ArrayList<>();
+
     @JsonIgnore
     @Override
     public boolean isJobCancellable() {
         return super.isJobCancellable() && 
jobManagerDeploymentStatus.isRestApiAvailable();
     }
+
+    /**
+     * Update the conditions with a RUNNING condition if required.
+     *
+     * @return a list of Conditions that will be either empty or containing a 
running condition.
+     */
+    public List<Condition> setRunningConditionIfRequired() {
+        Condition conditionToAdd = null;
+
+        if (reconciliationStatus != null) {
+            FlinkDeploymentSpec deploymentSpec =
+                    reconciliationStatus.deserializeLastReconciledSpec();
+
+            if (deploymentSpec != null) {
+                switch (Mode.getMode(deploymentSpec)) {
+                    case APPLICATION:
+                        conditionToAdd =
+                                ConditionsUtils.createApplicationModeCondition(
+                                        getJobStatus().getState());
+                        break;
+                    case SESSION:
+                        conditionToAdd =
+                                ConditionsUtils.createSessionModeCondition(
+                                        jobManagerDeploymentStatus);
+                }
+                ConditionsUtils.updateLastTransitionTime(conditions, 
conditionToAdd);
+            }
+        }
+        List<Condition> newConditions = new ArrayList<>();
+        if (conditionToAdd != null) {
+            newConditions = new ArrayList<>(List.of(conditionToAdd));
+        }
+        setConditions(newConditions);
+        return newConditions;
+    }
 }
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
index 2c86c49a..98250774 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
@@ -21,20 +21,38 @@ package org.apache.flink.kubernetes.operator.api.status;
 public enum JobManagerDeploymentStatus {
 
     /** JobManager is running and ready to receive REST API calls. */
-    READY,
+    READY("JobManagerReady", "JobManager is running and ready to receive REST 
API calls"),
 
     /** JobManager is running but not ready yet to receive REST API calls. */
-    DEPLOYED_NOT_READY,
+    DEPLOYED_NOT_READY(
+            "DeployedNotReady",
+            "JobManager is running but not yet ready to receive REST API 
calls"),
 
     /** JobManager process is starting up. */
-    DEPLOYING,
+    DEPLOYING("JobManagerIsDeploying", "JobManager process is starting up"),
 
     /** JobManager deployment not found, probably not started or killed by 
user. */
     // TODO: currently a mix of SUSPENDED and ERROR, needs cleanup
-    MISSING,
+    MISSING("JobManagerDeploymentMissing", "JobManager deployment not found"),
 
     /** Deployment in terminal error, requires spec change for reconciliation 
to continue. */
-    ERROR;
+    ERROR("Error", "JobManager deployment failed");
+
+    private String reason;
+    private String message;
+
+    JobManagerDeploymentStatus(String reason, String message) {
+        this.reason = reason;
+        this.message = message;
+    }
+
+    public String getReason() {
+        return reason;
+    }
+
+    public String getMessage() {
+        return message;
+    }
 
     public boolean isRestApiAvailable() {
         return this == READY;
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionsUtils.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionsUtils.java
new file mode 100644
index 00000000..d8a25a59
--- /dev/null
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionsUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.utils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+
+import io.fabric8.kubernetes.api.model.Condition;
+import io.fabric8.kubernetes.api.model.ConditionBuilder;
+
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus.READY;
+
+/** Utility methods for working with Kubernetes {@link Condition} objects. */
+public class ConditionsUtils {
+
+    /**
+     * Updates the last transition time of the given condition based on the 
existing conditions
+     * list. If a condition of the same type already exists and has the same 
status, the existing
+     * transition time is preserved; otherwise a new timestamp is set.
+     *
+     * @param conditions the current list of conditions
+     * @param condition the new condition whose last transition time should be 
updated
+     */
+    public static void updateLastTransitionTime(List<Condition> conditions, 
Condition condition) {
+        if (condition == null) {
+            return;
+        }
+        Condition existingCondition =
+                conditions.stream()
+                        .filter(c -> c.getType().equals(condition.getType()))
+                        .findFirst()
+                        .orElse(null);
+        
condition.setLastTransitionTime(getLastTransitionTimeStamp(existingCondition, 
condition));
+    }
+
+    /**
+     * Creates a Running condition for application mode based on the given 
{@link JobStatus}.
+     *
+     * @param jobStatus the current job status
+     * @return a {@link Condition} reflecting whether the job is running
+     */
+    public static Condition createApplicationModeCondition(JobStatus 
jobStatus) {
+        return new ConditionBuilder()
+                .withType(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING)
+                .withStatus(jobStatus == RUNNING ? "True" : "False")
+                .withReason(toCamelCase(jobStatus.name()))
+                .withMessage("Job status " + jobStatus.name())
+                .build();
+    }
+
+    /**
+     * Creates a Running condition for session mode based on the given {@link
+     * JobManagerDeploymentStatus}.
+     *
+     * @param jmStatus the current JobManager deployment status
+     * @return a {@link Condition} reflecting whether the session cluster is 
running
+     */
+    public static Condition 
createSessionModeCondition(JobManagerDeploymentStatus jmStatus) {
+        return new ConditionBuilder()
+                .withType(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING)
+                .withStatus(jmStatus == READY ? "True" : "False")
+                .withReason(jmStatus.getReason())
+                .withMessage(jmStatus.getMessage())
+                .build();
+    }
+
+    /**
+     * Converts a string to CamelCase by lower-casing it and upper-casing the 
first letter. Reason
+     * in the condition object should be a CamelCase string, so need to 
convert JobStatus as all the
+     * keywords are one noun, so we only need to upper case the first letter.
+     *
+     * @param reason the string to convert
+     * @return CamelCase reason as String
+     */
+    private static String toCamelCase(String reason) {
+        reason = reason.toLowerCase();
+        return reason.substring(0, 1).toUpperCase() + reason.substring(1);
+    }
+
+    /**
+     * Gets the last transition time for the condition. Returns the current 
time if there is no
+     * existing condition or if the condition status has changed, otherwise 
returns the existing
+     * condition's LastTransitionTime.
+     *
+     * @param existingCondition the current condition object, may be null
+     * @param condition the new condition object to compare against the 
existing one
+     * @return a string representing the last transition time in ISO 8601 
format with nanosecond
+     *     precision (e.g., "2025-10-30T07:35:35.189752790Z"). Returns a new 
timestamp if the
+     *     existing condition is null or the status has changed, otherwise 
returns the last
+     *     transition time of the existing condition.
+     */
+    private static String getLastTransitionTimeStamp(
+            Condition existingCondition, Condition condition) {
+        String lastTransitionTime;
+        if (existingCondition == null
+                || 
!existingCondition.getStatus().equals(condition.getStatus())) {
+            lastTransitionTime = Instant.now().toString();
+        } else {
+            lastTransitionTime = existingCondition.getLastTransitionTime();
+        }
+        return lastTransitionTime;
+    }
+}
diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatusConditionsTest.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatusConditionsTest.java
new file mode 100644
index 00000000..0800dcbc
--- /dev/null
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatusConditionsTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
+
+import io.fabric8.kubernetes.api.model.Condition;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link FlinkDeploymentStatus#setRunningConditionIfRequired()}. */
+class FlinkDeploymentStatusConditionsTest {
+
+    /**
+     * Helper method to get a condition by type from a list of conditions.
+     *
+     * @param conditions the list of conditions
+     * @param type the condition type to find
+     * @return the condition with the specified type, or null if not found
+     */
+    private Condition getConditionByType(List<Condition> conditions, String 
type) {
+        return conditions.stream().filter(c -> 
type.equals(c.getType())).findFirst().orElse(null);
+    }
+
+    @Test
+    void testCreateConditionFromStatusWithNullReconciliationStatus() {
+        FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+        status.setReconciliationStatus(null);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertTrue(
+                conditions.isEmpty(),
+                "Should return empty list when reconciliation status is null");
+    }
+
+    @Test
+    void testCreateConditionFromStatusWithNullDeploymentSpec() {
+        FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+        FlinkDeploymentReconciliationStatus reconciliationStatus =
+                new FlinkDeploymentReconciliationStatus();
+        status.setReconciliationStatus(reconciliationStatus);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertTrue(conditions.isEmpty(), "Should return empty list when 
deployment spec is null");
+    }
+
+    @Test
+    void testApplicationModeConditionWithRunningJob() {
+        FlinkDeploymentStatus status = 
BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size(), "Should return one condition");
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING, 
condition.getType());
+        assertEquals("True", condition.getStatus());
+        assertEquals("Running", condition.getReason());
+        assertEquals("Job status RUNNING", condition.getMessage());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testApplicationModeConditionWithFailedJob() {
+        FlinkDeploymentStatus status = 
BaseTestUtils.createApplicationModeStatus(JobStatus.FAILED);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size());
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING, 
condition.getType());
+        assertEquals("False", condition.getStatus());
+        assertEquals("Failed", condition.getReason());
+        assertEquals("Job status FAILED", condition.getMessage());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testApplicationModeConditionWithCanceledJob() {
+        FlinkDeploymentStatus status =
+                BaseTestUtils.createApplicationModeStatus(JobStatus.CANCELED);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size());
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals("False", condition.getStatus());
+        assertEquals("Canceled", condition.getReason());
+        assertEquals("Job status CANCELED", condition.getMessage());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testApplicationModeConditionWithFinishedJob() {
+        FlinkDeploymentStatus status =
+                BaseTestUtils.createApplicationModeStatus(JobStatus.FINISHED);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size());
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals("False", condition.getStatus());
+        assertEquals("Finished", condition.getReason());
+        assertEquals("Job status FINISHED", condition.getMessage());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testApplicationModeConditionWithCreatedJob() {
+        FlinkDeploymentStatus status = 
BaseTestUtils.createApplicationModeStatus(JobStatus.CREATED);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size());
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals("False", condition.getStatus());
+        assertEquals("Created", condition.getReason());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testSessionModeConditionWithReadyJobManager() {
+        FlinkDeploymentStatus status =
+                
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size());
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING, 
condition.getType());
+        assertEquals("True", condition.getStatus());
+        assertEquals(JobManagerDeploymentStatus.READY.getReason(), 
condition.getReason());
+        assertEquals(JobManagerDeploymentStatus.READY.getMessage(), 
condition.getMessage());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testSessionModeConditionWithDeployingJobManager() {
+        FlinkDeploymentStatus status =
+                
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size());
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals("False", condition.getStatus());
+        assertEquals(JobManagerDeploymentStatus.DEPLOYING.getReason(), 
condition.getReason());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testSessionModeConditionWithMissingJobManager() {
+        FlinkDeploymentStatus status =
+                
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.MISSING);
+
+        List<Condition> conditions = status.setRunningConditionIfRequired();
+
+        assertEquals(1, conditions.size());
+        Condition condition =
+                getConditionByType(conditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(condition);
+        assertEquals("False", condition.getStatus());
+        assertNotNull(condition.getLastTransitionTime());
+    }
+
+    @Test
+    void testConditionTypeIsAlwaysRunning() {
+        // Test application mode
+        FlinkDeploymentStatus appStatus =
+                BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+        List<Condition> appConditions = 
appStatus.setRunningConditionIfRequired();
+        Condition appCondition =
+                getConditionByType(appConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(appCondition);
+        assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING, 
appCondition.getType());
+
+        // Test session mode
+        FlinkDeploymentStatus sessionStatus =
+                
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY);
+        List<Condition> sessionConditions = 
sessionStatus.setRunningConditionIfRequired();
+        Condition sessionCondition =
+                getConditionByType(sessionConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(sessionCondition);
+        assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING, 
sessionCondition.getType());
+    }
+
+    @Test
+    void testGetLastTransitionTimeStamp_StatusUnchanged() throws 
InterruptedException {
+        // Test that timestamp is preserved when status doesn't change
+        FlinkDeploymentStatus status = 
BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+
+        // First call - creates initial condition with timestamp
+        List<Condition> firstConditions = 
status.setRunningConditionIfRequired();
+        Condition firstCondition =
+                getConditionByType(firstConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(firstCondition);
+        String firstTimestamp = firstCondition.getLastTransitionTime();
+
+        // Add the condition to status
+        status.setConditions(List.of(firstCondition));
+
+        // Second call - status unchanged (still RUNNING)
+        List<Condition> secondConditions = 
status.setRunningConditionIfRequired();
+        Condition secondCondition =
+                getConditionByType(secondConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(secondCondition);
+        String secondTimestamp = secondCondition.getLastTransitionTime();
+
+        // Timestamp should be preserved since status didn't change
+        assertEquals(firstTimestamp, secondTimestamp);
+    }
+
+    @Test
+    void testGetLastTransitionTimeStamp_StatusChanged() throws 
InterruptedException {
+        // Test that timestamp is updated when status changes
+        FlinkDeploymentStatus status = 
BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+
+        // First call - creates initial condition with RUNNING status
+        List<Condition> firstConditions = 
status.setRunningConditionIfRequired();
+        Condition firstCondition =
+                getConditionByType(firstConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(firstCondition);
+        String firstTimestamp = firstCondition.getLastTransitionTime();
+
+        // Add the condition to status
+        status.setConditions(List.of(firstCondition));
+
+        // Change status to FAILED
+        status.getJobStatus().setState(JobStatus.FAILED);
+
+        // Second call - status changed to FAILED
+        List<Condition> secondConditions = 
status.setRunningConditionIfRequired();
+        Condition secondCondition =
+                getConditionByType(secondConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(secondCondition);
+        String secondTimestamp = secondCondition.getLastTransitionTime();
+
+        // Timestamp should be different since status changed
+        assertTrue(
+                !firstTimestamp.equals(secondTimestamp),
+                "Timestamp should be updated when status changes");
+    }
+
+    @Test
+    void testGetLastTransitionTimeStamp_SessionMode() throws 
InterruptedException {
+        // Test timestamp behavior in session mode
+        FlinkDeploymentStatus status =
+                
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY);
+
+        // First call
+        List<Condition> firstConditions = 
status.setRunningConditionIfRequired();
+        Condition firstCondition =
+                getConditionByType(firstConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(firstCondition);
+        String firstTimestamp = firstCondition.getLastTransitionTime();
+
+        // Add condition to status
+        status.setConditions(List.of(firstCondition));
+
+        // Second call with same status
+        List<Condition> secondConditions = 
status.setRunningConditionIfRequired();
+        Condition secondCondition =
+                getConditionByType(secondConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(secondCondition);
+        String secondTimestamp = secondCondition.getLastTransitionTime();
+        // Timestamp should be preserved
+        assertEquals(firstTimestamp, secondTimestamp);
+
+        // Change to DEPLOYING
+        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+        // Third call with changed status
+        List<Condition> thirdConditions = 
status.setRunningConditionIfRequired();
+        Condition thirdCondition =
+                getConditionByType(thirdConditions, 
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+        assertNotNull(thirdCondition);
+        String thirdTimestamp = thirdCondition.getLastTransitionTime();
+        // Timestamp should be updated
+        assertTrue(
+                !secondTimestamp.equals(thirdTimestamp),
+                "Timestamp should be updated when JobManager status changes");
+    }
+}
diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index f3bd54f2..5a47bbd2 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.api.utils;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -39,8 +40,10 @@ import 
org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
 import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
+import 
org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
+import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
@@ -49,6 +52,7 @@ import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
@@ -268,4 +272,43 @@ public class BaseTestUtils {
 
         return snapshot;
     }
+
+    public static FlinkDeploymentStatus createApplicationModeStatus(JobStatus 
jobStatus) {
+        FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+        org.apache.flink.kubernetes.operator.api.status.JobStatus 
flinkJobStatus =
+                new 
org.apache.flink.kubernetes.operator.api.status.JobStatus();
+        flinkJobStatus.setState(jobStatus);
+        status.setJobStatus(flinkJobStatus);
+        status.setConditions(new ArrayList<>());
+
+        FlinkDeploymentReconciliationStatus reconciliationStatus =
+                new FlinkDeploymentReconciliationStatus();
+
+        FlinkDeployment deployment = BaseTestUtils.buildApplicationCluster();
+        String serializedSpec = 
SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment);
+        reconciliationStatus.setLastReconciledSpec(serializedSpec);
+
+        status.setReconciliationStatus(reconciliationStatus);
+        status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
+        return status;
+    }
+
+    public static FlinkDeploymentStatus createSessionModeStatus(
+            JobManagerDeploymentStatus jmStatus) {
+        FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+        status.setJobManagerDeploymentStatus(jmStatus);
+        status.setConditions(new ArrayList<>());
+
+        FlinkDeploymentReconciliationStatus reconciliationStatus =
+                new FlinkDeploymentReconciliationStatus();
+
+        FlinkDeployment deployment = BaseTestUtils.buildSessionCluster();
+        String serializedSpec = 
SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment);
+        reconciliationStatus.setLastReconciledSpec(serializedSpec);
+
+        status.setReconciliationStatus(reconciliationStatus);
+
+        return status;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 32a3109d..69909c5c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -165,6 +165,7 @@ public class FlinkDeploymentController
             throw new ReconciliationException(e);
         }
 
+        flinkApp.getStatus().setRunningConditionIfRequired();
         LOG.debug("End of reconciliation");
         statusRecorder.patchAndCacheStatus(flinkApp, 
ctx.getKubernetesClient());
         return ReconciliationUtils.toUpdateControl(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 90555407..8f12183d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
 
-/** The observer of {@link 
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
+/** The observer of {@link 
org.apache.flink.kubernetes.operator.api.Mode#APPLICATION} cluster. */
 public class ApplicationObserver extends AbstractFlinkDeploymentObserver {
 
     private final SnapshotObserver<FlinkDeployment, FlinkDeploymentStatus> 
savepointObserver;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
index c5e11241..6facfb16 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
@@ -19,8 +19,8 @@ package 
org.apache.flink.kubernetes.operator.observer.deployment;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
-import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.observer.Observer;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 84f213e7..4b0ba0d3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import java.util.concurrent.TimeoutException;
 
-/** The observer of the {@link 
org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
+/** The observer of the {@link 
org.apache.flink.kubernetes.operator.api.Mode#SESSION} cluster. */
 public class SessionObserver extends AbstractFlinkDeploymentObserver {
 
     public SessionObserver(EventRecorder eventRecorder) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 429caaa6..3a389a79 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -20,10 +20,10 @@ package 
org.apache.flink.kubernetes.operator.reconciler.deployment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
-import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 7c483f7f..ac3dab33 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -27,10 +27,10 @@ import 
org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import 
org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
 import 
org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 4d379139..65cdfaf0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -130,7 +130,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(7, testController.getInternalStatusUpdateCount());
+        assertEquals(8, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isPatchStatus());
 
         FlinkDeploymentReconciliationStatus reconciliationStatus =
@@ -277,6 +277,8 @@ public class FlinkDeploymentControllerTest {
 
         validatingResponseProvider.assertValidated();
 
+        validateConditionStatus(appCluster, "Reconciling");
+
         // Validate status
         assertNotNull(appCluster.getStatus().getError());
 
@@ -359,7 +361,7 @@ public class FlinkDeploymentControllerTest {
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
                 "savepoint_1", 
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
-
+        validateConditionStatus(appCluster, "Finished");
         // Resume from last savepoint
         appCluster.getSpec().getJob().setState(JobState.RUNNING);
         testController.reconcile(appCluster, context);
@@ -618,6 +620,7 @@ public class FlinkDeploymentControllerTest {
         // jobStatus has not been set at this time
         assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, 
jobStatus.getState());
 
+        validateConditionStatus(appCluster, "Reconciling");
         // Switches operator mode to SESSION
         appCluster.getSpec().setJob(null);
         // Validation fails and JobObserver should still be used
@@ -639,6 +642,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
         assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
         assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
+        validateConditionStatus(appCluster, "Running");
     }
 
     @Test
@@ -652,6 +656,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
+        validateConditionStatus(appCluster, "JobManagerIsDeploying");
 
         updateControl = testController.reconcile(appCluster, context);
         JobStatus jobStatus = appCluster.getStatus().getJobStatus();
@@ -662,6 +667,8 @@ public class FlinkDeploymentControllerTest {
         // jobStatus has not been set at this time
         assertNull(jobStatus.getState());
 
+        validateConditionStatus(appCluster, "DeployedNotReady");
+
         // Switches operator mode to APPLICATION
         
appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob());
         // Validation fails and JobObserver should still be used
@@ -676,6 +683,8 @@ public class FlinkDeploymentControllerTest {
                         .getError()
                         .contains("Cannot switch from session to job 
cluster"));
         assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob());
+
+        validateConditionStatus(appCluster, "JobManagerReady");
     }
 
     private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws 
Exception {
@@ -1169,7 +1178,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(6, testController.getInternalStatusUpdateCount());
+        assertEquals(7, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isPatchStatus());
         assertEquals(
                 Optional.of(
@@ -1184,7 +1193,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
-        assertEquals(6, testController.getInternalStatusUpdateCount());
+        assertEquals(7, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isPatchStatus());
         assertEquals(
                 Optional.of(
@@ -1254,4 +1263,12 @@ public class FlinkDeploymentControllerTest {
             return ingressRuleV1beta1.getHost();
         }
     }
+
+    private void validateConditionStatus(FlinkDeployment appCluster, String 
reason) {
+        assertThat(appCluster.getStatus().getConditions()).isNotNull();
+        assertThat(appCluster.getStatus().getConditions())
+                .hasSize(1)
+                .extracting("reason")
+                .contains(reason);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index fde2b124..a871466b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -24,11 +24,11 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
 import org.apache.flink.util.concurrent.Executors;
 
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 33ab49e8..c81b2b10 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -10685,6 +10685,23 @@ spec:
                 additionalProperties:
                   type: string
                 type: object
+              conditions:
+                items:
+                  properties:
+                    lastTransitionTime:
+                      type: string
+                    message:
+                      type: string
+                    observedGeneration:
+                      type: integer
+                    reason:
+                      type: string
+                    status:
+                      type: string
+                    type:
+                      type: string
+                  type: object
+                type: array
               error:
                 type: string
               jobManagerDeploymentStatus:

Reply via email to